1use arrow_array::{RecordBatch, RecordBatchReader};
8use arrow_schema::DataType;
9use byteorder::{ByteOrder, LittleEndian};
10use chrono::{prelude::*, Duration};
11use deepsize::DeepSizeOf;
12use futures::future::BoxFuture;
13use futures::stream::{self, BoxStream, StreamExt, TryStreamExt};
14use futures::{FutureExt, Stream};
15
16use crate::dataset::blob::blob_version_from_config;
17use crate::dataset::metadata::UpdateFieldMetadataBuilder;
18use crate::dataset::transaction::translate_schema_metadata_updates;
19use crate::session::caches::{DSMetadataCache, ManifestKey, TransactionKey};
20use crate::session::index_caches::DSIndexCache;
21use itertools::Itertools;
22use lance_core::datatypes::{
23 BlobVersion, Field, OnMissing, OnTypeMismatch, Projectable, Projection,
24};
25use lance_core::traits::DatasetTakeRows;
26use lance_core::utils::address::RowAddress;
27use lance_core::utils::tracing::{
28 DATASET_CLEANING_EVENT, DATASET_DELETING_EVENT, DATASET_DROPPING_COLUMN_EVENT,
29 TRACE_DATASET_EVENTS,
30};
31use lance_core::{ROW_ADDR, ROW_ADDR_FIELD, ROW_ID_FIELD};
32use lance_datafusion::projection::ProjectionPlan;
33use lance_file::datatypes::populate_schema_dictionary;
34use lance_file::reader::FileReaderOptions;
35use lance_file::version::LanceFileVersion;
36use lance_index::DatasetIndexExt;
37use lance_io::object_store::{
38 LanceNamespaceStorageOptionsProvider, ObjectStore, ObjectStoreParams, StorageOptions,
39 StorageOptionsAccessor, StorageOptionsProvider,
40};
41use lance_io::utils::{read_last_block, read_message, read_metadata_offset, read_struct};
42use lance_namespace::LanceNamespace;
43use lance_table::format::{
44 pb, DataFile, DataStorageFormat, DeletionFile, Fragment, IndexMetadata, Manifest, RowIdMeta,
45};
46use lance_table::io::commit::{
47 migrate_scheme_to_v2, write_manifest_file_to_path, CommitConfig, CommitError, CommitHandler,
48 CommitLock, ManifestLocation, ManifestNamingScheme, VERSIONS_DIR,
49};
50use lance_table::io::manifest::{read_manifest, read_manifest_indexes};
51use object_store::path::Path;
52use prost::Message;
53use roaring::RoaringBitmap;
54use rowids::get_row_id_index;
55use serde::{Deserialize, Serialize};
56use snafu::location;
57use std::borrow::Cow;
58use std::collections::{BTreeMap, HashMap, HashSet};
59use std::fmt::Debug;
60use std::ops::Range;
61use std::pin::Pin;
62use std::sync::Arc;
63use take::row_offsets_to_row_addresses;
64use tracing::{info, instrument};
65
66pub(crate) mod blob;
67mod branch_location;
68pub mod builder;
69pub mod cleanup;
70pub mod delta;
71pub mod fragment;
72mod hash_joiner;
73pub mod index;
74mod metadata;
75pub mod optimize;
76pub mod progress;
77pub mod refs;
78pub(crate) mod rowids;
79pub mod scanner;
80mod schema_evolution;
81pub mod sql;
82pub mod statistics;
83mod take;
84pub mod transaction;
85pub mod udtf;
86pub mod updater;
87mod utils;
88pub mod write;
89
90use self::builder::DatasetBuilder;
91use self::cleanup::RemovalStats;
92use self::fragment::FileFragment;
93use self::refs::Refs;
94use self::scanner::{DatasetRecordBatchStream, Scanner};
95use self::transaction::{Operation, Transaction, TransactionBuilder, UpdateMapEntry};
96use self::write::write_fragments_internal;
97use crate::dataset::branch_location::BranchLocation;
98use crate::dataset::cleanup::{CleanupPolicy, CleanupPolicyBuilder};
99use crate::dataset::refs::{BranchContents, Branches, Tags};
100use crate::dataset::sql::SqlQueryBuilder;
101use crate::datatypes::Schema;
102use crate::index::retain_supported_indices;
103use crate::io::commit::{
104 commit_detached_transaction, commit_new_dataset, commit_transaction,
105 detect_overlapping_fragments,
106};
107use crate::session::Session;
108use crate::utils::temporal::{timestamp_to_nanos, utc_now, SystemTime};
109use crate::{Error, Result};
110pub use blob::BlobFile;
111use hash_joiner::HashJoiner;
112use lance_core::box_error;
113pub use lance_core::ROW_ID;
114use lance_namespace::models::{
115 CreateEmptyTableRequest, DeclareTableRequest, DeclareTableResponse, DescribeTableRequest,
116};
117use lance_table::feature_flags::{apply_feature_flags, can_read_dataset};
118use lance_table::io::deletion::{relative_deletion_file_path, DELETIONS_DIR};
119pub use schema_evolution::{
120 BatchInfo, BatchUDF, ColumnAlteration, NewColumnTransform, UDFCheckpointStore,
121};
122pub use take::TakeBuilder;
123pub use write::merge_insert::{
124 MergeInsertBuilder, MergeInsertJob, MergeStats, UncommittedMergeInsert, WhenMatched,
125 WhenNotMatched, WhenNotMatchedBySource,
126};
127
128pub use write::update::{UpdateBuilder, UpdateJob};
129#[allow(deprecated)]
130pub use write::{
131 write_fragments, AutoCleanupParams, CommitBuilder, DeleteBuilder, InsertBuilder,
132 WriteDestination, WriteMode, WriteParams,
133};
134
135pub(crate) const INDICES_DIR: &str = "_indices";
136pub(crate) const DATA_DIR: &str = "data";
137pub(crate) const TRANSACTIONS_DIR: &str = "_transactions";
138
139pub const DEFAULT_INDEX_CACHE_SIZE: usize = 6 * 1024 * 1024 * 1024;
142pub const DEFAULT_METADATA_CACHE_SIZE: usize = 1024 * 1024 * 1024;
146
147#[derive(Clone)]
149pub struct Dataset {
150 pub object_store: Arc<ObjectStore>,
151 pub(crate) commit_handler: Arc<dyn CommitHandler>,
152 uri: String,
157 pub(crate) base: Path,
158 pub manifest: Arc<Manifest>,
159 pub(crate) manifest_location: ManifestLocation,
162 pub(crate) session: Arc<Session>,
163 pub refs: Refs,
164
165 pub(crate) fragment_bitmap: Arc<RoaringBitmap>,
167
168 pub(crate) index_cache: Arc<DSIndexCache>,
170 pub(crate) metadata_cache: Arc<DSMetadataCache>,
171
172 pub(crate) file_reader_options: Option<FileReaderOptions>,
174
175 pub(crate) store_params: Option<Box<ObjectStoreParams>>,
178}
179
180impl std::fmt::Debug for Dataset {
181 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182 f.debug_struct("Dataset")
183 .field("uri", &self.uri)
184 .field("base", &self.base)
185 .field("version", &self.manifest.version)
186 .field("cache_num_items", &self.session.approx_num_items())
187 .finish()
188 }
189}
190
191#[derive(Deserialize, Serialize, Debug)]
193pub struct Version {
194 pub version: u64,
196
197 pub timestamp: DateTime<Utc>,
199
200 pub metadata: BTreeMap<String, String>,
202}
203
204impl From<&Manifest> for Version {
206 fn from(m: &Manifest) -> Self {
207 Self {
208 version: m.version,
209 timestamp: m.timestamp(),
210 metadata: m.summary().into(),
211 }
212 }
213}
214
215#[derive(Clone, Debug)]
217pub struct ReadParams {
218 pub index_cache_size_bytes: usize,
221
222 pub metadata_cache_size_bytes: usize,
225
226 pub session: Option<Arc<Session>>,
230
231 pub store_options: Option<ObjectStoreParams>,
232
233 pub commit_handler: Option<Arc<dyn CommitHandler>>,
250
251 pub file_reader_options: Option<FileReaderOptions>,
255}
256
257impl ReadParams {
258 #[deprecated(
260 since = "0.30.0",
261 note = "Use `index_cache_size_bytes` instead, which accepts a size in bytes."
262 )]
263 pub fn index_cache_size(&mut self, cache_size: usize) -> &mut Self {
264 let assumed_entry_size = 20 * 1024 * 1024; self.index_cache_size_bytes = cache_size * assumed_entry_size;
266 self
267 }
268
269 pub fn index_cache_size_bytes(&mut self, cache_size: usize) -> &mut Self {
270 self.index_cache_size_bytes = cache_size;
271 self
272 }
273
274 #[deprecated(
276 since = "0.30.0",
277 note = "Use `metadata_cache_size_bytes` instead, which accepts a size in bytes."
278 )]
279 pub fn metadata_cache_size(&mut self, cache_size: usize) -> &mut Self {
280 let assumed_entry_size = 10 * 1024 * 1024; self.metadata_cache_size_bytes = cache_size * assumed_entry_size;
282 self
283 }
284
285 pub fn metadata_cache_size_bytes(&mut self, cache_size: usize) -> &mut Self {
287 self.metadata_cache_size_bytes = cache_size;
288 self
289 }
290
291 pub fn session(&mut self, session: Arc<Session>) -> &mut Self {
293 self.session = Some(session);
294 self
295 }
296
297 pub fn set_commit_lock<T: CommitLock + Send + Sync + 'static>(&mut self, lock: Arc<T>) {
299 self.commit_handler = Some(Arc::new(lock));
300 }
301
302 pub fn file_reader_options(&mut self, options: FileReaderOptions) -> &mut Self {
304 self.file_reader_options = Some(options);
305 self
306 }
307}
308
309impl Default for ReadParams {
310 fn default() -> Self {
311 Self {
312 index_cache_size_bytes: DEFAULT_INDEX_CACHE_SIZE,
313 metadata_cache_size_bytes: DEFAULT_METADATA_CACHE_SIZE,
314 session: None,
315 store_options: None,
316 commit_handler: None,
317 file_reader_options: None,
318 }
319 }
320}
321
322#[derive(Debug, Clone)]
323pub enum ProjectionRequest {
324 Schema(Arc<Schema>),
325 Sql(Vec<(String, String)>),
326}
327
328impl ProjectionRequest {
329 pub fn from_columns(
330 columns: impl IntoIterator<Item = impl AsRef<str>>,
331 dataset_schema: &Schema,
332 ) -> Self {
333 let columns = columns
334 .into_iter()
335 .map(|s| s.as_ref().to_string())
336 .collect::<Vec<_>>();
337
338 let mut data_columns = Vec::new();
342 let mut system_fields = Vec::new();
343
344 for col in &columns {
345 if lance_core::is_system_column(col) {
346 if col == ROW_ID {
348 system_fields.push(Field::try_from(ROW_ID_FIELD.clone()).unwrap());
349 } else if col == ROW_ADDR {
350 system_fields.push(Field::try_from(ROW_ADDR_FIELD.clone()).unwrap());
351 }
352 } else {
354 data_columns.push(col.as_str());
355 }
356 }
357
358 let mut schema = dataset_schema.project(&data_columns).unwrap();
360
361 let mut final_fields = Vec::new();
364 for col in &columns {
365 if lance_core::is_system_column(col) {
366 if let Some(field) = system_fields.iter().find(|f| &f.name == col) {
368 final_fields.push(field.clone());
369 }
370 } else {
371 if let Some(field) = schema.fields.iter().find(|f| &f.name == col) {
373 final_fields.push(field.clone());
374 }
375 }
376 }
377
378 schema.fields = final_fields;
379 Self::Schema(Arc::new(schema))
380 }
381
382 pub fn from_schema(schema: Schema) -> Self {
383 Self::Schema(Arc::new(schema))
384 }
385
386 pub fn from_sql(
392 columns: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
393 ) -> Self {
394 Self::Sql(
395 columns
396 .into_iter()
397 .map(|(a, b)| (a.into(), b.into()))
398 .collect(),
399 )
400 }
401
402 pub fn into_projection_plan(self, dataset: Arc<Dataset>) -> Result<ProjectionPlan> {
403 let blob_version = dataset.blob_version();
404 match self {
405 Self::Schema(schema) => {
406 let system_columns_present = schema
409 .fields
410 .iter()
411 .any(|f| lance_core::is_system_column(&f.name));
412
413 if system_columns_present {
414 ProjectionPlan::from_schema(dataset, schema.as_ref(), blob_version)
417 } else {
418 let projection = dataset.schema().project_by_schema(
420 schema.as_ref(),
421 OnMissing::Error,
422 OnTypeMismatch::Error,
423 )?;
424 ProjectionPlan::from_schema(dataset, &projection, blob_version)
425 }
426 }
427 Self::Sql(columns) => ProjectionPlan::from_expressions(dataset, &columns, blob_version),
428 }
429 }
430}
431
432impl From<Arc<Schema>> for ProjectionRequest {
433 fn from(schema: Arc<Schema>) -> Self {
434 Self::Schema(schema)
435 }
436}
437
438impl From<Schema> for ProjectionRequest {
439 fn from(schema: Schema) -> Self {
440 Self::from(Arc::new(schema))
441 }
442}
443
444impl Dataset {
445 #[instrument]
449 pub async fn open(uri: &str) -> Result<Self> {
450 DatasetBuilder::from_uri(uri).load().await
451 }
452
453 pub async fn checkout_version(&self, version: impl Into<refs::Ref>) -> Result<Self> {
455 let reference: refs::Ref = version.into();
456 match reference {
457 refs::Ref::Version(branch, version_number) => {
458 self.checkout_by_ref(version_number, branch.as_deref())
459 .await
460 }
461 refs::Ref::VersionNumber(version_number) => {
462 self.checkout_by_ref(Some(version_number), self.manifest.branch.as_deref())
463 .await
464 }
465 refs::Ref::Tag(tag_name) => {
466 let tag_contents = self.tags().get(tag_name.as_str()).await?;
467 self.checkout_by_ref(Some(tag_contents.version), tag_contents.branch.as_deref())
468 .await
469 }
470 }
471 }
472
473 pub fn tags(&self) -> Tags<'_> {
474 self.refs.tags()
475 }
476
477 pub fn branches(&self) -> Branches<'_> {
478 self.refs.branches()
479 }
480
481 pub async fn checkout_latest(&mut self) -> Result<()> {
483 let (manifest, manifest_location) = self.latest_manifest().await?;
484 self.manifest = manifest;
485 self.manifest_location = manifest_location;
486 self.fragment_bitmap = Arc::new(
487 self.manifest
488 .fragments
489 .iter()
490 .map(|f| f.id as u32)
491 .collect(),
492 );
493 Ok(())
494 }
495
496 pub async fn checkout_branch(&self, branch: &str) -> Result<Self> {
498 self.checkout_by_ref(None, Some(branch)).await
499 }
500
501 pub async fn create_branch(
517 &mut self,
518 branch: &str,
519 version: impl Into<refs::Ref>,
520 store_params: Option<ObjectStoreParams>,
521 ) -> Result<Self> {
522 let (source_branch, version_number) = self.resolve_reference(version.into()).await?;
523 let branch_location = self.find_branch_location(branch)?;
524 let clone_op = Operation::Clone {
525 is_shallow: true,
526 ref_name: source_branch.clone(),
527 ref_version: version_number,
528 ref_path: String::from(self.uri()),
529 branch_name: Some(branch.to_string()),
530 };
531 let transaction = Transaction::new(version_number, clone_op, None);
532
533 let builder = CommitBuilder::new(WriteDestination::Uri(branch_location.uri.as_str()))
534 .with_store_params(store_params.unwrap_or_default())
535 .with_object_store(Arc::new(self.object_store().clone()))
536 .with_commit_handler(self.commit_handler.clone())
537 .with_storage_format(self.manifest.data_storage_format.lance_file_version()?);
538 let dataset = builder.execute(transaction).await?;
539
540 self.branches()
542 .create(branch, version_number, source_branch.as_deref())
543 .await?;
544 Ok(dataset)
545 }
546
547 pub async fn delete_branch(&mut self, branch: &str) -> Result<()> {
548 self.branches().delete(branch, false).await
549 }
550
551 pub async fn force_delete_branch(&mut self, branch: &str) -> Result<()> {
554 self.branches().delete(branch, true).await
555 }
556
557 pub async fn list_branches(&self) -> Result<HashMap<String, BranchContents>> {
558 self.branches().list().await
559 }
560
561 fn already_checked_out(&self, location: &ManifestLocation, branch_name: Option<&str>) -> bool {
562 self.manifest.branch.as_deref() == branch_name
565 && self.manifest.version == location.version
566 && self.manifest_location.naming_scheme == location.naming_scheme
567 && location.e_tag.as_ref().is_some_and(|e_tag| {
568 self.manifest_location
569 .e_tag
570 .as_ref()
571 .is_some_and(|current_e_tag| e_tag == current_e_tag)
572 })
573 }
574
575 async fn checkout_by_ref(
576 &self,
577 version_number: Option<u64>,
578 branch: Option<&str>,
579 ) -> Result<Self> {
580 let new_location = self.branch_location().find_branch(branch)?;
581
582 let manifest_location = if let Some(version_number) = version_number {
583 self.commit_handler
584 .resolve_version_location(
585 &new_location.path,
586 version_number,
587 &self.object_store.inner,
588 )
589 .await?
590 } else {
591 self.commit_handler
592 .resolve_latest_location(&new_location.path, &self.object_store)
593 .await?
594 };
595
596 if self.already_checked_out(&manifest_location, branch) {
597 return Ok(self.clone());
598 }
599
600 let manifest = Self::load_manifest(
601 self.object_store.as_ref(),
602 &manifest_location,
603 &new_location.uri,
604 self.session.as_ref(),
605 )
606 .await?;
607 Self::checkout_manifest(
608 self.object_store.clone(),
609 new_location.path,
610 new_location.uri,
611 Arc::new(manifest),
612 manifest_location,
613 self.session.clone(),
614 self.commit_handler.clone(),
615 self.file_reader_options.clone(),
616 self.store_params.as_deref().cloned(),
617 )
618 }
619
620 pub(crate) async fn load_manifest(
621 object_store: &ObjectStore,
622 manifest_location: &ManifestLocation,
623 uri: &str,
624 session: &Session,
625 ) -> Result<Manifest> {
626 let object_reader = if let Some(size) = manifest_location.size {
627 object_store
628 .open_with_size(&manifest_location.path, size as usize)
629 .await
630 } else {
631 object_store.open(&manifest_location.path).await
632 };
633 let object_reader = object_reader.map_err(|e| match &e {
634 Error::NotFound { uri, .. } => Error::DatasetNotFound {
635 path: uri.clone(),
636 source: box_error(e),
637 location: location!(),
638 },
639 _ => e,
640 })?;
641
642 let last_block =
643 read_last_block(object_reader.as_ref())
644 .await
645 .map_err(|err| match err {
646 object_store::Error::NotFound { path, source } => Error::DatasetNotFound {
647 path,
648 source,
649 location: location!(),
650 },
651 _ => Error::IO {
652 source: err.into(),
653 location: location!(),
654 },
655 })?;
656 let offset = read_metadata_offset(&last_block)?;
657
658 let manifest_size = object_reader.size().await?;
660 let mut manifest = if manifest_size - offset <= last_block.len() {
661 let manifest_len = manifest_size - offset;
662 let offset_in_block = last_block.len() - manifest_len;
663 let message_len =
664 LittleEndian::read_u32(&last_block[offset_in_block..offset_in_block + 4]) as usize;
665 let message_data = &last_block[offset_in_block + 4..offset_in_block + 4 + message_len];
666 Manifest::try_from(lance_table::format::pb::Manifest::decode(message_data)?)
667 } else {
668 read_struct(object_reader.as_ref(), offset).await
669 }?;
670
671 if !can_read_dataset(manifest.reader_feature_flags) {
672 let message = format!(
673 "This dataset cannot be read by this version of Lance. \
674 Please upgrade Lance to read this dataset.\n Flags: {}",
675 manifest.reader_feature_flags
676 );
677 return Err(Error::NotSupported {
678 source: message.into(),
679 location: location!(),
680 });
681 }
682
683 if let Some(index_offset) = manifest.index_section {
686 if manifest_size - index_offset <= last_block.len() {
687 let offset_in_block = last_block.len() - (manifest_size - index_offset);
688 let message_len =
689 LittleEndian::read_u32(&last_block[offset_in_block..offset_in_block + 4])
690 as usize;
691 let message_data =
692 &last_block[offset_in_block + 4..offset_in_block + 4 + message_len];
693 let section = lance_table::format::pb::IndexSection::decode(message_data)?;
694 let mut indices: Vec<IndexMetadata> = section
695 .indices
696 .into_iter()
697 .map(IndexMetadata::try_from)
698 .collect::<Result<Vec<_>>>()?;
699 retain_supported_indices(&mut indices);
700 let ds_index_cache = session.index_cache.for_dataset(uri);
701 let metadata_key = crate::session::index_caches::IndexMetadataKey {
702 version: manifest_location.version,
703 };
704 ds_index_cache
705 .insert_with_key(&metadata_key, Arc::new(indices))
706 .await;
707 }
708 }
709
710 if let Some(transaction_offset) = manifest.transaction_section {
713 if manifest_size - transaction_offset <= last_block.len() {
714 let offset_in_block = last_block.len() - (manifest_size - transaction_offset);
715 let message_len =
716 LittleEndian::read_u32(&last_block[offset_in_block..offset_in_block + 4])
717 as usize;
718 let message_data =
719 &last_block[offset_in_block + 4..offset_in_block + 4 + message_len];
720 let transaction: Transaction =
721 lance_table::format::pb::Transaction::decode(message_data)?.try_into()?;
722
723 let metadata_cache = session.metadata_cache.for_dataset(uri);
724 let metadata_key = TransactionKey {
725 version: manifest_location.version,
726 };
727 metadata_cache
728 .insert_with_key(&metadata_key, Arc::new(transaction))
729 .await;
730 }
731 }
732
733 if manifest.should_use_legacy_format() {
734 populate_schema_dictionary(&mut manifest.schema, object_reader.as_ref()).await?;
735 }
736
737 Ok(manifest)
738 }
739
740 #[allow(clippy::too_many_arguments)]
741 fn checkout_manifest(
742 object_store: Arc<ObjectStore>,
743 base_path: Path,
744 uri: String,
745 manifest: Arc<Manifest>,
746 manifest_location: ManifestLocation,
747 session: Arc<Session>,
748 commit_handler: Arc<dyn CommitHandler>,
749 file_reader_options: Option<FileReaderOptions>,
750 store_params: Option<ObjectStoreParams>,
751 ) -> Result<Self> {
752 let refs = Refs::new(
753 object_store.clone(),
754 commit_handler.clone(),
755 BranchLocation {
756 path: base_path.clone(),
757 uri: uri.clone(),
758 branch: manifest.branch.clone(),
759 },
760 );
761 let metadata_cache = Arc::new(session.metadata_cache.for_dataset(&uri));
762 let index_cache = Arc::new(session.index_cache.for_dataset(&uri));
763 let fragment_bitmap = Arc::new(manifest.fragments.iter().map(|f| f.id as u32).collect());
764 Ok(Self {
765 object_store,
766 base: base_path,
767 uri,
768 manifest,
769 manifest_location,
770 commit_handler,
771 session,
772 refs,
773 fragment_bitmap,
774 metadata_cache,
775 index_cache,
776 file_reader_options,
777 store_params: store_params.map(Box::new),
778 })
779 }
780
781 pub async fn write(
789 batches: impl RecordBatchReader + Send + 'static,
790 dest: impl Into<WriteDestination<'_>>,
791 params: Option<WriteParams>,
792 ) -> Result<Self> {
793 let mut builder = InsertBuilder::new(dest);
794 if let Some(params) = ¶ms {
795 builder = builder.with_params(params);
796 }
797 Box::pin(builder.execute_stream(Box::new(batches) as Box<dyn RecordBatchReader + Send>))
798 .await
799 }
800
801 pub async fn write_into_namespace(
813 batches: impl RecordBatchReader + Send + 'static,
814 namespace: Arc<dyn LanceNamespace>,
815 table_id: Vec<String>,
816 mut params: Option<WriteParams>,
817 ) -> Result<Self> {
818 let mut write_params = params.take().unwrap_or_default();
819
820 match write_params.mode {
821 WriteMode::Create => {
822 let declare_request = DeclareTableRequest {
823 id: Some(table_id.clone()),
824 ..Default::default()
825 };
826 #[allow(deprecated)]
830 let response = match namespace.declare_table(declare_request).await {
831 Ok(resp) => resp,
832 Err(Error::NotSupported { .. }) => {
833 let fallback_request = CreateEmptyTableRequest {
834 id: Some(table_id.clone()),
835 ..Default::default()
836 };
837 let fallback_resp = namespace
838 .create_empty_table(fallback_request)
839 .await
840 .map_err(|e| Error::Namespace {
841 source: Box::new(e),
842 location: location!(),
843 })?;
844 DeclareTableResponse {
845 transaction_id: fallback_resp.transaction_id,
846 location: fallback_resp.location,
847 storage_options: fallback_resp.storage_options,
848 }
849 }
850 Err(e) => {
851 return Err(Error::Namespace {
852 source: Box::new(e),
853 location: location!(),
854 });
855 }
856 };
857
858 let uri = response.location.ok_or_else(|| Error::Namespace {
859 source: Box::new(std::io::Error::other(
860 "Table location not found in declare_table response",
861 )),
862 location: location!(),
863 })?;
864
865 if let Some(namespace_storage_options) = response.storage_options {
867 let provider: Arc<dyn StorageOptionsProvider> = Arc::new(
868 LanceNamespaceStorageOptionsProvider::new(namespace, table_id),
869 );
870
871 let mut merged_options = write_params
873 .store_params
874 .as_ref()
875 .and_then(|p| p.storage_options().cloned())
876 .unwrap_or_default();
877 merged_options.extend(namespace_storage_options);
878
879 let accessor = Arc::new(StorageOptionsAccessor::with_initial_and_provider(
880 merged_options,
881 provider,
882 ));
883
884 let existing_params = write_params.store_params.take().unwrap_or_default();
885 write_params.store_params = Some(ObjectStoreParams {
886 storage_options_accessor: Some(accessor),
887 ..existing_params
888 });
889 }
890
891 Self::write(batches, uri.as_str(), Some(write_params)).await
892 }
893 WriteMode::Append | WriteMode::Overwrite => {
894 let request = DescribeTableRequest {
895 id: Some(table_id.clone()),
896 ..Default::default()
897 };
898 let response =
899 namespace
900 .describe_table(request)
901 .await
902 .map_err(|e| Error::Namespace {
903 source: Box::new(e),
904 location: location!(),
905 })?;
906
907 let uri = response.location.ok_or_else(|| Error::Namespace {
908 source: Box::new(std::io::Error::other(
909 "Table location not found in describe_table response",
910 )),
911 location: location!(),
912 })?;
913
914 if let Some(namespace_storage_options) = response.storage_options {
916 let provider: Arc<dyn StorageOptionsProvider> =
917 Arc::new(LanceNamespaceStorageOptionsProvider::new(
918 namespace.clone(),
919 table_id.clone(),
920 ));
921
922 let mut merged_options = write_params
924 .store_params
925 .as_ref()
926 .and_then(|p| p.storage_options().cloned())
927 .unwrap_or_default();
928 merged_options.extend(namespace_storage_options);
929
930 let accessor = Arc::new(StorageOptionsAccessor::with_initial_and_provider(
931 merged_options,
932 provider,
933 ));
934
935 let existing_params = write_params.store_params.take().unwrap_or_default();
936 write_params.store_params = Some(ObjectStoreParams {
937 storage_options_accessor: Some(accessor),
938 ..existing_params
939 });
940 }
941
942 let mut builder = DatasetBuilder::from_uri(uri.as_str());
946 if let Some(ref store_params) = write_params.store_params {
947 if let Some(accessor) = &store_params.storage_options_accessor {
948 builder = builder.with_storage_options_accessor(accessor.clone());
949 }
950 }
951 let dataset = Arc::new(builder.load().await?);
952
953 Self::write(batches, dataset, Some(write_params)).await
954 }
955 }
956 }
957
958 pub async fn append(
962 &mut self,
963 batches: impl RecordBatchReader + Send + 'static,
964 params: Option<WriteParams>,
965 ) -> Result<()> {
966 let write_params = WriteParams {
967 mode: WriteMode::Append,
968 ..params.unwrap_or_default()
969 };
970
971 let new_dataset = InsertBuilder::new(WriteDestination::Dataset(Arc::new(self.clone())))
972 .with_params(&write_params)
973 .execute_stream(Box::new(batches) as Box<dyn RecordBatchReader + Send>)
974 .await?;
975
976 *self = new_dataset;
977
978 Ok(())
979 }
980
981 pub fn uri(&self) -> &str {
983 &self.uri
984 }
985
986 pub fn branch_location(&self) -> BranchLocation {
987 BranchLocation {
988 path: self.base.clone(),
989 uri: self.uri.clone(),
990 branch: self.manifest.branch.clone(),
991 }
992 }
993
994 pub fn find_branch_location(&self, branch_name: &str) -> Result<BranchLocation> {
995 let current_location = BranchLocation {
996 path: self.base.clone(),
997 uri: self.uri.clone(),
998 branch: self.manifest.branch.clone(),
999 };
1000 current_location.find_branch(Some(branch_name))
1001 }
1002
1003 pub fn manifest(&self) -> &Manifest {
1005 &self.manifest
1006 }
1007
1008 pub fn manifest_location(&self) -> &ManifestLocation {
1009 &self.manifest_location
1010 }
1011
1012 pub fn delta(&self) -> delta::DatasetDeltaBuilder {
1027 delta::DatasetDeltaBuilder::new(self.clone())
1028 }
1029
1030 pub(crate) fn is_legacy_storage(&self) -> bool {
1032 self.manifest
1033 .data_storage_format
1034 .lance_file_version()
1035 .unwrap()
1036 == LanceFileVersion::Legacy
1037 }
1038
1039 pub async fn latest_manifest(&self) -> Result<(Arc<Manifest>, ManifestLocation)> {
1040 let location = self
1041 .commit_handler
1042 .resolve_latest_location(&self.base, &self.object_store)
1043 .await?;
1044
1045 let manifest_key = ManifestKey {
1047 version: location.version,
1048 e_tag: location.e_tag.as_deref(),
1049 };
1050 let cached_manifest = self.metadata_cache.get_with_key(&manifest_key).await;
1051 if let Some(cached_manifest) = cached_manifest {
1052 return Ok((cached_manifest, location));
1053 }
1054
1055 if self.already_checked_out(&location, self.manifest.branch.as_deref()) {
1056 return Ok((self.manifest.clone(), self.manifest_location.clone()));
1057 }
1058 let mut manifest = read_manifest(&self.object_store, &location.path, location.size).await?;
1059 if manifest.schema.has_dictionary_types() && manifest.should_use_legacy_format() {
1060 let reader = if let Some(size) = location.size {
1061 self.object_store
1062 .open_with_size(&location.path, size as usize)
1063 .await?
1064 } else {
1065 self.object_store.open(&location.path).await?
1066 };
1067 populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
1068 }
1069 let manifest_arc = Arc::new(manifest);
1070 self.metadata_cache
1071 .insert_with_key(&manifest_key, manifest_arc.clone())
1072 .await;
1073 Ok((manifest_arc, location))
1074 }
1075
1076 pub async fn read_transaction(&self) -> Result<Option<Transaction>> {
1081 let transaction_key = TransactionKey {
1082 version: self.manifest.version,
1083 };
1084 if let Some(transaction) = self.metadata_cache.get_with_key(&transaction_key).await {
1085 return Ok(Some((*transaction).clone()));
1086 }
1087
1088 let transaction = if let Some(pos) = self.manifest.transaction_section {
1090 let reader = if let Some(size) = self.manifest_location.size {
1091 self.object_store
1092 .open_with_size(&self.manifest_location.path, size as usize)
1093 .await?
1094 } else {
1095 self.object_store.open(&self.manifest_location.path).await?
1096 };
1097
1098 let tx: pb::Transaction = read_message(reader.as_ref(), pos).await?;
1099 Transaction::try_from(tx).map(Some)?
1100 } else if let Some(path) = &self.manifest.transaction_file {
1101 let path = self.transactions_dir().child(path.as_str());
1103 let data = self.object_store.inner.get(&path).await?.bytes().await?;
1104 let transaction = lance_table::format::pb::Transaction::decode(data)?;
1105 Transaction::try_from(transaction).map(Some)?
1106 } else {
1107 None
1108 };
1109
1110 if let Some(tx) = transaction.as_ref() {
1111 self.metadata_cache
1112 .insert_with_key(&transaction_key, Arc::new(tx.clone()))
1113 .await;
1114 }
1115 Ok(transaction)
1116 }
1117
1118 pub async fn read_transaction_by_version(&self, version: u64) -> Result<Option<Transaction>> {
1123 let dataset_version = self.checkout_version(version).await?;
1124 dataset_version.read_transaction().await
1125 }
1126
1127 pub async fn get_transactions(
1142 &self,
1143 recent_transactions: usize,
1144 ) -> Result<Vec<Option<Transaction>>> {
1145 let mut transactions = vec![];
1146 let mut dataset = self.clone();
1147
1148 loop {
1149 let transaction = dataset.read_transaction().await?;
1150 transactions.push(transaction);
1151
1152 if transactions.len() >= recent_transactions {
1153 break;
1154 } else {
1155 match dataset
1156 .checkout_version(dataset.version().version - 1)
1157 .await
1158 {
1159 Ok(ds) => dataset = ds,
1160 Err(Error::DatasetNotFound { .. }) => break,
1161 Err(err) => return Err(err),
1162 }
1163 }
1164 }
1165
1166 Ok(transactions)
1167 }
1168
1169 pub async fn restore(&mut self) -> Result<()> {
1171 let (latest_manifest, _) = self.latest_manifest().await?;
1172 let latest_version = latest_manifest.version;
1173
1174 let transaction = Transaction::new(
1175 latest_version,
1176 Operation::Restore {
1177 version: self.manifest.version,
1178 },
1179 None,
1180 );
1181
1182 self.apply_commit(transaction, &Default::default(), &Default::default())
1183 .await?;
1184
1185 Ok(())
1186 }
1187
1188 #[instrument(level = "debug", skip(self))]
1209 pub fn cleanup_old_versions(
1210 &self,
1211 older_than: Duration,
1212 delete_unverified: Option<bool>,
1213 error_if_tagged_old_versions: Option<bool>,
1214 ) -> BoxFuture<'_, Result<RemovalStats>> {
1215 let mut builder = CleanupPolicyBuilder::default();
1216 builder = builder.before_timestamp(utc_now() - older_than);
1217 if let Some(v) = delete_unverified {
1218 builder = builder.delete_unverified(v);
1219 }
1220 if let Some(v) = error_if_tagged_old_versions {
1221 builder = builder.error_if_tagged_old_versions(v);
1222 }
1223
1224 self.cleanup_with_policy(builder.build())
1225 }
1226
1227 #[instrument(level = "debug", skip(self))]
1243 pub fn cleanup_with_policy(
1244 &self,
1245 policy: CleanupPolicy,
1246 ) -> BoxFuture<'_, Result<RemovalStats>> {
1247 info!(target: TRACE_DATASET_EVENTS, event=DATASET_CLEANING_EVENT, uri=&self.uri);
1248 cleanup::cleanup_old_versions(self, policy).boxed()
1249 }
1250
1251 #[allow(clippy::too_many_arguments)]
1252 async fn do_commit(
1253 base_uri: WriteDestination<'_>,
1254 operation: Operation,
1255 read_version: Option<u64>,
1256 store_params: Option<ObjectStoreParams>,
1257 commit_handler: Option<Arc<dyn CommitHandler>>,
1258 session: Arc<Session>,
1259 enable_v2_manifest_paths: bool,
1260 detached: bool,
1261 ) -> Result<Self> {
1262 let read_version = read_version.map_or_else(
1263 || match operation {
1264 Operation::Overwrite { .. } | Operation::Restore { .. } => Ok(0),
1265 _ => Err(Error::invalid_input(
1266 "read_version must be specified for this operation",
1267 location!(),
1268 )),
1269 },
1270 Ok,
1271 )?;
1272
1273 let transaction = Transaction::new(read_version, operation, None);
1274
1275 let mut builder = CommitBuilder::new(base_uri)
1276 .enable_v2_manifest_paths(enable_v2_manifest_paths)
1277 .with_session(session)
1278 .with_detached(detached);
1279
1280 if let Some(store_params) = store_params {
1281 builder = builder.with_store_params(store_params);
1282 }
1283
1284 if let Some(commit_handler) = commit_handler {
1285 builder = builder.with_commit_handler(commit_handler);
1286 }
1287
1288 builder.execute(transaction).await
1289 }
1290
1291 pub async fn commit(
1326 dest: impl Into<WriteDestination<'_>>,
1327 operation: Operation,
1328 read_version: Option<u64>,
1329 store_params: Option<ObjectStoreParams>,
1330 commit_handler: Option<Arc<dyn CommitHandler>>,
1331 session: Arc<Session>,
1332 enable_v2_manifest_paths: bool,
1333 ) -> Result<Self> {
1334 Self::do_commit(
1335 dest.into(),
1336 operation,
1337 read_version,
1338 store_params,
1339 commit_handler,
1340 session,
1341 enable_v2_manifest_paths,
1342 false,
1343 )
1344 .await
1345 }
1346
1347 pub async fn commit_detached(
1356 dest: impl Into<WriteDestination<'_>>,
1357 operation: Operation,
1358 read_version: Option<u64>,
1359 store_params: Option<ObjectStoreParams>,
1360 commit_handler: Option<Arc<dyn CommitHandler>>,
1361 session: Arc<Session>,
1362 enable_v2_manifest_paths: bool,
1363 ) -> Result<Self> {
1364 Self::do_commit(
1365 dest.into(),
1366 operation,
1367 read_version,
1368 store_params,
1369 commit_handler,
1370 session,
1371 enable_v2_manifest_paths,
1372 true,
1373 )
1374 .await
1375 }
1376
1377 pub(crate) async fn apply_commit(
1378 &mut self,
1379 transaction: Transaction,
1380 write_config: &ManifestWriteConfig,
1381 commit_config: &CommitConfig,
1382 ) -> Result<()> {
1383 let (manifest, manifest_location) = commit_transaction(
1384 self,
1385 self.object_store(),
1386 self.commit_handler.as_ref(),
1387 &transaction,
1388 write_config,
1389 commit_config,
1390 self.manifest_location.naming_scheme,
1391 None,
1392 )
1393 .await?;
1394
1395 self.manifest = Arc::new(manifest);
1396 self.manifest_location = manifest_location;
1397 self.fragment_bitmap = Arc::new(
1398 self.manifest
1399 .fragments
1400 .iter()
1401 .map(|f| f.id as u32)
1402 .collect(),
1403 );
1404
1405 Ok(())
1406 }
1407
1408 pub fn scan(&self) -> Scanner {
1410 Scanner::new(Arc::new(self.clone()))
1411 }
1412
1413 #[instrument(skip_all)]
1417 pub async fn count_rows(&self, filter: Option<String>) -> Result<usize> {
1418 if let Some(filter) = filter {
1420 let mut scanner = self.scan();
1421 scanner.filter(&filter)?;
1422 Ok(scanner
1423 .project::<String>(&[])?
1424 .with_row_id() .count_rows()
1426 .await? as usize)
1427 } else {
1428 self.count_all_rows().await
1429 }
1430 }
1431
1432 pub(crate) async fn count_all_rows(&self) -> Result<usize> {
1433 let cnts = stream::iter(self.get_fragments())
1434 .map(|f| async move { f.count_rows(None).await })
1435 .buffer_unordered(16)
1436 .try_collect::<Vec<_>>()
1437 .await?;
1438 Ok(cnts.iter().sum())
1439 }
1440
1441 #[instrument(skip_all, fields(num_rows=row_indices.len()))]
1443 pub async fn take(
1444 &self,
1445 row_indices: &[u64],
1446 projection: impl Into<ProjectionRequest>,
1447 ) -> Result<RecordBatch> {
1448 take::take(self, row_indices, projection.into()).await
1449 }
1450
1451 pub async fn take_rows(
1486 &self,
1487 row_ids: &[u64],
1488 projection: impl Into<ProjectionRequest>,
1489 ) -> Result<RecordBatch> {
1490 Arc::new(self.clone())
1491 .take_builder(row_ids, projection)?
1492 .execute()
1493 .await
1494 }
1495
1496 pub fn take_builder(
1497 self: &Arc<Self>,
1498 row_ids: &[u64],
1499 projection: impl Into<ProjectionRequest>,
1500 ) -> Result<TakeBuilder> {
1501 TakeBuilder::try_new_from_ids(self.clone(), row_ids.to_vec(), projection.into())
1502 }
1503
1504 pub async fn take_blobs(
1506 self: &Arc<Self>,
1507 row_ids: &[u64],
1508 column: impl AsRef<str>,
1509 ) -> Result<Vec<BlobFile>> {
1510 blob::take_blobs(self, row_ids, column.as_ref()).await
1511 }
1512
1513 pub async fn take_blobs_by_addresses(
1521 self: &Arc<Self>,
1522 row_addrs: &[u64],
1523 column: impl AsRef<str>,
1524 ) -> Result<Vec<BlobFile>> {
1525 blob::take_blobs_by_addresses(self, row_addrs, column.as_ref()).await
1526 }
1527
1528 pub async fn take_blobs_by_indices(
1530 self: &Arc<Self>,
1531 row_indices: &[u64],
1532 column: impl AsRef<str>,
1533 ) -> Result<Vec<BlobFile>> {
1534 let row_addrs = row_offsets_to_row_addresses(self, row_indices).await?;
1535 blob::take_blobs_by_addresses(self, &row_addrs, column.as_ref()).await
1536 }
1537
1538 pub fn take_scan(
1542 &self,
1543 row_ranges: Pin<Box<dyn Stream<Item = Result<Range<u64>>> + Send>>,
1544 projection: Arc<Schema>,
1545 batch_readahead: usize,
1546 ) -> DatasetRecordBatchStream {
1547 take::take_scan(self, row_ranges, projection, batch_readahead)
1548 }
1549
1550 pub(crate) async fn sample(&self, n: usize, projection: &Schema) -> Result<RecordBatch> {
1552 use rand::seq::IteratorRandom;
1553 let num_rows = self.count_rows(None).await?;
1554 let ids = (0..num_rows as u64).choose_multiple(&mut rand::rng(), n);
1555 self.take(&ids, projection.clone()).await
1556 }
1557
1558 pub async fn delete(&mut self, predicate: &str) -> Result<()> {
1560 info!(target: TRACE_DATASET_EVENTS, event=DATASET_DELETING_EVENT, uri = &self.uri, predicate=predicate);
1561 write::delete::delete(self, predicate).await
1562 }
1563
1564 pub async fn truncate_table(&mut self) -> Result<()> {
1566 self.delete("true").await
1567 }
1568
1569 pub async fn add_bases(
1585 self: &Arc<Self>,
1586 new_bases: Vec<lance_table::format::BasePath>,
1587 transaction_properties: Option<HashMap<String, String>>,
1588 ) -> Result<Self> {
1589 let operation = Operation::UpdateBases { new_bases };
1590
1591 let transaction = TransactionBuilder::new(self.manifest.version, operation)
1592 .transaction_properties(transaction_properties.map(Arc::new))
1593 .build();
1594
1595 let new_dataset = CommitBuilder::new(self.clone())
1596 .execute(transaction)
1597 .await?;
1598
1599 Ok(new_dataset)
1600 }
1601
1602 pub async fn count_deleted_rows(&self) -> Result<usize> {
1603 futures::stream::iter(self.get_fragments())
1604 .map(|f| async move { f.count_deletions().await })
1605 .buffer_unordered(self.object_store.io_parallelism())
1606 .try_fold(0, |acc, x| futures::future::ready(Ok(acc + x)))
1607 .await
1608 }
1609
1610 pub fn object_store(&self) -> &ObjectStore {
1611 &self.object_store
1612 }
1613
1614 #[deprecated(since = "0.25.0", note = "Use initial_storage_options() instead")]
1619 pub fn storage_options(&self) -> Option<&HashMap<String, String>> {
1620 self.initial_storage_options()
1621 }
1622
1623 pub fn initial_storage_options(&self) -> Option<&HashMap<String, String>> {
1627 self.store_params
1628 .as_ref()
1629 .and_then(|params| params.storage_options())
1630 }
1631
1632 pub fn storage_options_provider(
1634 &self,
1635 ) -> Option<Arc<dyn lance_io::object_store::StorageOptionsProvider>> {
1636 self.store_params
1637 .as_ref()
1638 .and_then(|params| params.storage_options_accessor.as_ref())
1639 .and_then(|accessor| accessor.provider().cloned())
1640 }
1641
1642 pub fn storage_options_accessor(&self) -> Option<Arc<StorageOptionsAccessor>> {
1648 self.store_params
1649 .as_ref()
1650 .and_then(|params| params.get_accessor())
1651 }
1652
1653 pub async fn latest_storage_options(&self) -> Result<Option<StorageOptions>> {
1666 if let Some(accessor) = self.storage_options_accessor() {
1668 let options = accessor.get_storage_options().await?;
1669 return Ok(Some(options));
1670 }
1671
1672 Ok(self.initial_storage_options().cloned().map(StorageOptions))
1674 }
1675
1676 pub fn data_dir(&self) -> Path {
1677 self.base.child(DATA_DIR)
1678 }
1679
1680 pub fn indices_dir(&self) -> Path {
1681 self.base.child(INDICES_DIR)
1682 }
1683
1684 pub fn transactions_dir(&self) -> Path {
1685 self.base.child(TRANSACTIONS_DIR)
1686 }
1687
1688 pub fn deletions_dir(&self) -> Path {
1689 self.base.child(DELETIONS_DIR)
1690 }
1691
1692 pub fn versions_dir(&self) -> Path {
1693 self.base.child(VERSIONS_DIR)
1694 }
1695
1696 pub(crate) fn data_file_dir(&self, data_file: &DataFile) -> Result<Path> {
1697 match data_file.base_id.as_ref() {
1698 Some(base_id) => {
1699 let base_paths = &self.manifest.base_paths;
1700 let base_path = base_paths.get(base_id).ok_or_else(|| {
1701 Error::invalid_input(
1702 format!(
1703 "base_path id {} not found for data_file {}",
1704 base_id, data_file.path
1705 ),
1706 location!(),
1707 )
1708 })?;
1709 let path = base_path.extract_path(self.session.store_registry())?;
1710 if base_path.is_dataset_root {
1711 Ok(path.child(DATA_DIR))
1712 } else {
1713 Ok(path)
1714 }
1715 }
1716 None => Ok(self.base.child(DATA_DIR)),
1717 }
1718 }
1719
1720 pub(crate) async fn object_store_for_base(&self, base_id: u32) -> Result<Arc<ObjectStore>> {
1722 let base_path = self.manifest.base_paths.get(&base_id).ok_or_else(|| {
1723 Error::invalid_input(
1724 format!("Dataset base path with ID {} not found", base_id),
1725 Default::default(),
1726 )
1727 })?;
1728
1729 let (store, _) = ObjectStore::from_uri_and_params(
1730 self.session.store_registry(),
1731 &base_path.path,
1732 &self.store_params.as_deref().cloned().unwrap_or_default(),
1733 )
1734 .await?;
1735
1736 Ok(store)
1737 }
1738
1739 pub(crate) fn dataset_dir_for_deletion(&self, deletion_file: &DeletionFile) -> Result<Path> {
1740 match deletion_file.base_id.as_ref() {
1741 Some(base_id) => {
1742 let base_paths = &self.manifest.base_paths;
1743 let base_path = base_paths.get(base_id).ok_or_else(|| {
1744 Error::invalid_input(
1745 format!(
1746 "base_path id {} not found for deletion_file {:?}",
1747 base_id, deletion_file
1748 ),
1749 location!(),
1750 )
1751 })?;
1752
1753 if !base_path.is_dataset_root {
1754 return Err(Error::Internal {
1755 message: format!(
1756 "base_path id {} is not a dataset root for deletion_file {:?}",
1757 base_id, deletion_file
1758 ),
1759 location: location!(),
1760 });
1761 }
1762 base_path.extract_path(self.session.store_registry())
1763 }
1764 None => Ok(self.base.clone()),
1765 }
1766 }
1767
1768 pub(crate) fn indice_files_dir(&self, index: &IndexMetadata) -> Result<Path> {
1770 match index.base_id.as_ref() {
1771 Some(base_id) => {
1772 let base_paths = &self.manifest.base_paths;
1773 let base_path = base_paths.get(base_id).ok_or_else(|| {
1774 Error::invalid_input(
1775 format!(
1776 "base_path id {} not found for index {}",
1777 base_id, index.uuid
1778 ),
1779 location!(),
1780 )
1781 })?;
1782 let path = base_path.extract_path(self.session.store_registry())?;
1783 if base_path.is_dataset_root {
1784 Ok(path.child(INDICES_DIR))
1785 } else {
1786 Ok(path)
1788 }
1789 }
1790 None => Ok(self.base.child(INDICES_DIR)),
1791 }
1792 }
1793
1794 pub fn session(&self) -> Arc<Session> {
1795 self.session.clone()
1796 }
1797
1798 pub fn version(&self) -> Version {
1799 Version::from(self.manifest.as_ref())
1800 }
1801
1802 pub async fn index_cache_entry_count(&self) -> usize {
1804 self.session.index_cache.size().await
1805 }
1806
1807 pub async fn index_cache_hit_rate(&self) -> f32 {
1809 let stats = self.session.index_cache_stats().await;
1810 stats.hit_ratio()
1811 }
1812
1813 pub fn cache_size_bytes(&self) -> u64 {
1814 self.session.deep_size_of() as u64
1815 }
1816
1817 pub async fn versions(&self) -> Result<Vec<Version>> {
1819 let mut versions: Vec<Version> = self
1820 .commit_handler
1821 .list_manifest_locations(&self.base, &self.object_store, false)
1822 .try_filter_map(|location| async move {
1823 match read_manifest(&self.object_store, &location.path, location.size).await {
1824 Ok(manifest) => Ok(Some(Version::from(&manifest))),
1825 Err(e) => Err(e),
1826 }
1827 })
1828 .try_collect()
1829 .await?;
1830
1831 versions.sort_by_key(|v| v.version);
1833
1834 Ok(versions)
1835 }
1836
1837 pub async fn latest_version_id(&self) -> Result<u64> {
1841 Ok(self
1842 .commit_handler
1843 .resolve_latest_location(&self.base, &self.object_store)
1844 .await?
1845 .version)
1846 }
1847
1848 pub fn count_fragments(&self) -> usize {
1849 self.manifest.fragments.len()
1850 }
1851
1852 pub fn schema(&self) -> &Schema {
1854 &self.manifest.schema
1855 }
1856
1857 pub fn empty_projection(self: &Arc<Self>) -> Projection {
1860 Projection::empty(self.clone()).with_blob_version(self.blob_version())
1861 }
1862
1863 pub fn full_projection(self: &Arc<Self>) -> Projection {
1865 Projection::full(self.clone()).with_blob_version(self.blob_version())
1866 }
1867
1868 pub fn get_fragments(&self) -> Vec<FileFragment> {
1870 let dataset = Arc::new(self.clone());
1871 self.manifest
1872 .fragments
1873 .iter()
1874 .map(|f| FileFragment::new(dataset.clone(), f.clone()))
1875 .collect()
1876 }
1877
1878 pub fn get_fragment(&self, fragment_id: usize) -> Option<FileFragment> {
1879 let dataset = Arc::new(self.clone());
1880 let fragment = self
1881 .manifest
1882 .fragments
1883 .iter()
1884 .find(|f| f.id == fragment_id as u64)?;
1885 Some(FileFragment::new(dataset, fragment.clone()))
1886 }
1887
1888 pub fn fragments(&self) -> &Arc<Vec<Fragment>> {
1889 &self.manifest.fragments
1890 }
1891
1892 pub fn get_frags_from_ordered_ids(&self, ordered_ids: &[u32]) -> Vec<Option<FileFragment>> {
1895 let mut fragments = Vec::with_capacity(ordered_ids.len());
1896 let mut id_iter = ordered_ids.iter();
1897 let mut id = id_iter.next();
1898 let mut last_id: i64 = -1;
1900 for frag in self.manifest.fragments.iter() {
1901 let mut the_id = if let Some(id) = id { *id } else { break };
1902 assert!(the_id as i64 > last_id);
1904 while the_id < frag.id as u32 {
1907 fragments.push(None);
1908 last_id = the_id as i64;
1909 id = id_iter.next();
1910 the_id = if let Some(id) = id { *id } else { break };
1911 }
1912
1913 if the_id == frag.id as u32 {
1914 fragments.push(Some(FileFragment::new(
1915 Arc::new(self.clone()),
1916 frag.clone(),
1917 )));
1918 last_id = the_id as i64;
1919 id = id_iter.next();
1920 }
1921 }
1922 fragments
1923 }
1924
1925 async fn filter_addr_or_ids(&self, addr_or_ids: &[u64], addrs: &[u64]) -> Result<Vec<u64>> {
1927 if addrs.is_empty() {
1928 return Ok(Vec::new());
1929 }
1930
1931 let mut perm = permutation::sort(addrs);
1932 let sorted_addrs = perm.apply_slice(addrs);
1935
1936 let referenced_frag_ids = sorted_addrs
1938 .iter()
1939 .map(|addr| RowAddress::from(*addr).fragment_id())
1940 .dedup()
1941 .collect::<Vec<_>>();
1942 let frags = self.get_frags_from_ordered_ids(&referenced_frag_ids);
1943 let dv_futs = frags
1944 .iter()
1945 .map(|frag| {
1946 if let Some(frag) = frag {
1947 frag.get_deletion_vector().boxed()
1948 } else {
1949 std::future::ready(Ok(None)).boxed()
1950 }
1951 })
1952 .collect::<Vec<_>>();
1953 let dvs = stream::iter(dv_futs)
1954 .buffered(self.object_store.io_parallelism())
1955 .try_collect::<Vec<_>>()
1956 .await?;
1957
1958 let mut filtered_sorted_addrs = Vec::with_capacity(sorted_addrs.len());
1961 let mut sorted_addr_iter = sorted_addrs.into_iter().map(RowAddress::from);
1962 let mut next_addr = sorted_addr_iter.next().unwrap();
1963 let mut exhausted = false;
1964
1965 for frag_dv in frags.iter().zip(dvs).zip(referenced_frag_ids.iter()) {
1966 let ((frag, dv), frag_id) = frag_dv;
1967 if frag.is_some() {
1968 if let Some(dv) = dv.as_ref() {
1970 for deleted in dv.to_sorted_iter() {
1972 while next_addr.fragment_id() == *frag_id
1973 && next_addr.row_offset() < deleted
1974 {
1975 filtered_sorted_addrs.push(Some(u64::from(next_addr)));
1976 if let Some(next) = sorted_addr_iter.next() {
1977 next_addr = next;
1978 } else {
1979 exhausted = true;
1980 break;
1981 }
1982 }
1983 if exhausted {
1984 break;
1985 }
1986 if next_addr.fragment_id() != *frag_id {
1987 break;
1988 }
1989 if next_addr.row_offset() == deleted {
1990 filtered_sorted_addrs.push(None);
1991 if let Some(next) = sorted_addr_iter.next() {
1992 next_addr = next;
1993 } else {
1994 exhausted = true;
1995 break;
1996 }
1997 }
1998 }
1999 }
2000 if exhausted {
2001 break;
2002 }
2003 while next_addr.fragment_id() == *frag_id {
2006 filtered_sorted_addrs.push(Some(u64::from(next_addr)));
2007 if let Some(next) = sorted_addr_iter.next() {
2008 next_addr = next;
2009 } else {
2010 break;
2011 }
2012 }
2013 } else {
2014 while next_addr.fragment_id() == *frag_id {
2016 filtered_sorted_addrs.push(None);
2017 if let Some(next) = sorted_addr_iter.next() {
2018 next_addr = next;
2019 } else {
2020 break;
2021 }
2022 }
2023 }
2024 }
2025
2026 perm.apply_inv_slice_in_place(&mut filtered_sorted_addrs);
2030 Ok(addr_or_ids
2031 .iter()
2032 .zip(filtered_sorted_addrs)
2033 .filter_map(|(addr_or_id, maybe_addr)| maybe_addr.map(|_| *addr_or_id))
2034 .collect())
2035 }
2036
2037 pub(crate) async fn filter_deleted_ids(&self, ids: &[u64]) -> Result<Vec<u64>> {
2038 let addresses = if let Some(row_id_index) = get_row_id_index(self).await? {
2039 let addresses = ids
2040 .iter()
2041 .filter_map(|id| row_id_index.get(*id).map(|address| address.into()))
2042 .collect::<Vec<_>>();
2043 Cow::Owned(addresses)
2044 } else {
2045 Cow::Borrowed(ids)
2046 };
2047
2048 self.filter_addr_or_ids(ids, &addresses).await
2049 }
2050
2051 pub async fn num_small_files(&self, max_rows_per_group: usize) -> usize {
2056 futures::stream::iter(self.get_fragments())
2057 .map(|f| async move { f.physical_rows().await })
2058 .buffered(self.object_store.io_parallelism())
2059 .try_filter(|row_count| futures::future::ready(*row_count < max_rows_per_group))
2060 .count()
2061 .await
2062 }
2063
2064 pub async fn validate(&self) -> Result<()> {
2065 let id_counts =
2067 self.manifest
2068 .fragments
2069 .iter()
2070 .map(|f| f.id)
2071 .fold(HashMap::new(), |mut acc, id| {
2072 *acc.entry(id).or_insert(0) += 1;
2073 acc
2074 });
2075 for (id, count) in id_counts {
2076 if count > 1 {
2077 return Err(Error::corrupt_file(
2078 self.base.clone(),
2079 format!(
2080 "Duplicate fragment id {} found in dataset {:?}",
2081 id, self.base
2082 ),
2083 location!(),
2084 ));
2085 }
2086 }
2087
2088 self.manifest
2090 .fragments
2091 .iter()
2092 .map(|f| f.id)
2093 .try_fold(0, |prev, id| {
2094 if id < prev {
2095 Err(Error::corrupt_file(
2096 self.base.clone(),
2097 format!(
2098 "Fragment ids are not sorted in increasing fragment-id order. Found {} after {} in dataset {:?}",
2099 id, prev, self.base
2100 ),
2101 location!(),
2102 ))
2103 } else {
2104 Ok(id)
2105 }
2106 })?;
2107
2108 futures::stream::iter(self.get_fragments())
2110 .map(|f| async move { f.validate().await })
2111 .buffer_unordered(self.object_store.io_parallelism())
2112 .try_collect::<Vec<()>>()
2113 .await?;
2114
2115 let indices = self.load_indices().await?;
2117 self.validate_indices(&indices)?;
2118
2119 Ok(())
2120 }
2121
2122 fn validate_indices(&self, indices: &[IndexMetadata]) -> Result<()> {
2123 let mut index_ids = HashSet::new();
2125 for index in indices.iter() {
2126 if !index_ids.insert(&index.uuid) {
2127 return Err(Error::corrupt_file(
2128 self.manifest_location.path.clone(),
2129 format!(
2130 "Duplicate index id {} found in dataset {:?}",
2131 &index.uuid, self.base
2132 ),
2133 location!(),
2134 ));
2135 }
2136 }
2137
2138 if let Err(err) = detect_overlapping_fragments(indices) {
2140 let mut message = "Overlapping fragments detected in dataset.".to_string();
2141 for (index_name, overlapping_frags) in err.bad_indices {
2142 message.push_str(&format!(
2143 "\nIndex {:?} has overlapping fragments: {:?}",
2144 index_name, overlapping_frags
2145 ));
2146 }
2147 return Err(Error::corrupt_file(
2148 self.manifest_location.path.clone(),
2149 message,
2150 location!(),
2151 ));
2152 };
2153
2154 Ok(())
2155 }
2156
2157 pub async fn migrate_manifest_paths_v2(&mut self) -> Result<()> {
2192 migrate_scheme_to_v2(self.object_store(), &self.base).await?;
2193 let latest_version = self.latest_version_id().await?;
2195 *self = self.checkout_version(latest_version).await?;
2196 Ok(())
2197 }
2198
2199 pub async fn shallow_clone(
2204 &mut self,
2205 target_path: &str,
2206 version: impl Into<refs::Ref>,
2207 store_params: Option<ObjectStoreParams>,
2208 ) -> Result<Self> {
2209 let (ref_name, version_number) = self.resolve_reference(version.into()).await?;
2210 let clone_op = Operation::Clone {
2211 is_shallow: true,
2212 ref_name,
2213 ref_version: version_number,
2214 ref_path: self.uri.clone(),
2215 branch_name: None,
2216 };
2217 let transaction = Transaction::new(version_number, clone_op, None);
2218
2219 let builder = CommitBuilder::new(WriteDestination::Uri(target_path))
2220 .with_store_params(
2221 store_params.unwrap_or(self.store_params.as_deref().cloned().unwrap_or_default()),
2222 )
2223 .with_object_store(Arc::new(self.object_store().clone()))
2224 .with_commit_handler(self.commit_handler.clone())
2225 .with_storage_format(self.manifest.data_storage_format.lance_file_version()?);
2226 builder.execute(transaction).await
2227 }
2228
2229 pub async fn deep_clone(
2239 &mut self,
2240 target_path: &str,
2241 version: impl Into<refs::Ref>,
2242 store_params: Option<ObjectStoreParams>,
2243 ) -> Result<Self> {
2244 use futures::StreamExt;
2245
2246 let src_ds = self.checkout_version(version).await?;
2248 let src_paths = src_ds.collect_paths().await?;
2249
2250 let (target_store, target_base) = ObjectStore::from_uri_and_params(
2252 self.session.store_registry(),
2253 target_path,
2254 &store_params.clone().unwrap_or_default(),
2255 )
2256 .await?;
2257
2258 if self
2260 .commit_handler
2261 .resolve_latest_location(&target_base, &target_store)
2262 .await
2263 .is_ok()
2264 {
2265 return Err(Error::DatasetAlreadyExists {
2266 uri: target_path.to_string(),
2267 location: location!(),
2268 });
2269 }
2270
2271 let build_absolute_path = |relative_path: &str, base: &Path| -> Path {
2272 let mut path = base.clone();
2273 for seg in relative_path.split('/') {
2274 if !seg.is_empty() {
2275 path = path.child(seg);
2276 }
2277 }
2278 path
2279 };
2280
2281 let io_parallelism = self.object_store.io_parallelism();
2288 let copy_futures = src_paths
2289 .iter()
2290 .map(|(relative_path, base)| {
2291 let store = Arc::clone(&target_store);
2292 let src_path = build_absolute_path(relative_path, base);
2293 let target_path = build_absolute_path(relative_path, &target_base);
2294 async move { store.copy(&src_path, &target_path).await.map(|_| ()) }
2295 })
2296 .collect::<Vec<_>>();
2297
2298 futures::stream::iter(copy_futures)
2299 .buffer_unordered(io_parallelism)
2300 .collect::<Vec<_>>()
2301 .await
2302 .into_iter()
2303 .collect::<Result<Vec<_>>>()?;
2304
2305 let ref_name = src_ds.manifest.branch.clone();
2307 let ref_version = src_ds.manifest_location.version;
2308 let clone_op = Operation::Clone {
2309 is_shallow: false,
2310 ref_name,
2311 ref_version,
2312 ref_path: src_ds.uri().to_string(),
2313 branch_name: None,
2314 };
2315 let txn = Transaction::new(ref_version, clone_op, None);
2316 let builder = CommitBuilder::new(WriteDestination::Uri(target_path))
2317 .with_store_params(store_params.clone().unwrap_or_default())
2318 .with_object_store(target_store.clone())
2319 .with_commit_handler(self.commit_handler.clone())
2320 .with_storage_format(self.manifest.data_storage_format.lance_file_version()?);
2321 let new_ds = builder.execute(txn).await?;
2322 Ok(new_ds)
2323 }
2324
2325 async fn resolve_reference(&self, reference: refs::Ref) -> Result<(Option<String>, u64)> {
2326 match reference {
2327 refs::Ref::Version(branch, version_number) => {
2328 if let Some(version_number) = version_number {
2329 Ok((branch, version_number))
2330 } else {
2331 let branch_location = self.branch_location().find_branch(branch.as_deref())?;
2332 let version_number = self
2333 .commit_handler
2334 .resolve_latest_location(&branch_location.path, &self.object_store)
2335 .await?
2336 .version;
2337 Ok((branch, version_number))
2338 }
2339 }
2340 refs::Ref::VersionNumber(version_number) => {
2341 Ok((self.manifest.branch.clone(), version_number))
2342 }
2343 refs::Ref::Tag(tag_name) => {
2344 let tag_contents = self.tags().get(tag_name.as_str()).await?;
2345 Ok((tag_contents.branch, tag_contents.version))
2346 }
2347 }
2348 }
2349
2350 async fn collect_paths(&self) -> Result<Vec<(String, Path)>> {
2352 let mut file_paths: Vec<(String, Path)> = Vec::new();
2353 for fragment in self.manifest.fragments.iter() {
2354 if let Some(RowIdMeta::External(external_file)) = &fragment.row_id_meta {
2355 return Err(Error::Internal {
2356 message: format!(
2357 "External row_id_meta is not supported yet. external file path: {}",
2358 external_file.path
2359 ),
2360 location: location!(),
2361 });
2362 }
2363 for data_file in fragment.files.iter() {
2364 let base_root = if let Some(base_id) = data_file.base_id {
2365 let base_path =
2366 self.manifest
2367 .base_paths
2368 .get(&base_id)
2369 .ok_or_else(|| Error::Internal {
2370 message: format!("base_id {} not found", base_id),
2371 location: location!(),
2372 })?;
2373 Path::parse(base_path.path.as_str())?
2374 } else {
2375 self.base.clone()
2376 };
2377 file_paths.push((
2378 format!("{}/{}", DATA_DIR, data_file.path.clone()),
2379 base_root,
2380 ));
2381 }
2382 if let Some(deletion_file) = &fragment.deletion_file {
2383 let base_root = if let Some(base_id) = deletion_file.base_id {
2384 let base_path =
2385 self.manifest
2386 .base_paths
2387 .get(&base_id)
2388 .ok_or_else(|| Error::Internal {
2389 message: format!("base_id {} not found", base_id),
2390 location: location!(),
2391 })?;
2392 Path::parse(base_path.path.as_str())?
2393 } else {
2394 self.base.clone()
2395 };
2396 file_paths.push((
2397 relative_deletion_file_path(fragment.id, deletion_file),
2398 base_root,
2399 ));
2400 }
2401 }
2402
2403 let indices = read_manifest_indexes(
2404 self.object_store.as_ref(),
2405 &self.manifest_location,
2406 &self.manifest,
2407 )
2408 .await?;
2409
2410 for index in &indices {
2411 let base_root = if let Some(base_id) = index.base_id {
2412 let base_path =
2413 self.manifest
2414 .base_paths
2415 .get(&base_id)
2416 .ok_or_else(|| Error::Internal {
2417 message: format!("base_id {} not found", base_id),
2418 location: location!(),
2419 })?;
2420 Path::parse(base_path.path.as_str())?
2421 } else {
2422 self.base.clone()
2423 };
2424 let index_root = base_root.child(INDICES_DIR).child(index.uuid.to_string());
2425 let mut stream = self.object_store.read_dir_all(&index_root, None);
2426 while let Some(meta) = stream.next().await.transpose()? {
2427 if let Some(filename) = meta.location.filename() {
2428 file_paths.push((
2429 format!("{}/{}/{}", INDICES_DIR, index.uuid, filename),
2430 base_root.clone(),
2431 ));
2432 }
2433 }
2434 }
2435 Ok(file_paths)
2436 }
2437
2438 pub fn sql(&self, sql: &str) -> SqlQueryBuilder {
2442 SqlQueryBuilder::new(self.clone(), sql)
2443 }
2444
2445 pub(crate) fn lance_supports_nulls(&self, datatype: &DataType) -> bool {
2447 match self
2448 .manifest()
2449 .data_storage_format
2450 .lance_file_version()
2451 .unwrap_or(LanceFileVersion::Legacy)
2452 .resolve()
2453 {
2454 LanceFileVersion::Legacy => matches!(
2455 datatype,
2456 DataType::Utf8
2457 | DataType::LargeUtf8
2458 | DataType::Binary
2459 | DataType::List(_)
2460 | DataType::FixedSizeBinary(_)
2461 | DataType::FixedSizeList(_, _)
2462 ),
2463 LanceFileVersion::V2_0 => !matches!(datatype, DataType::Struct(..)),
2464 _ => true,
2465 }
2466 }
2467}
2468
2469pub(crate) struct NewTransactionResult<'a> {
2470 pub dataset: BoxFuture<'a, Result<Dataset>>,
2471 pub new_transactions: BoxStream<'a, Result<(u64, Arc<Transaction>)>>,
2472}
2473
2474pub(crate) fn load_new_transactions(dataset: &Dataset) -> NewTransactionResult<'_> {
2475 let io_parallelism = dataset.object_store().io_parallelism();
2478 let latest_version = dataset.manifest.version;
2479 let locations = dataset
2480 .commit_handler
2481 .list_manifest_locations(&dataset.base, dataset.object_store(), true)
2482 .try_take_while(move |location| {
2483 futures::future::ready(Ok(location.version > latest_version))
2484 });
2485
2486 let (latest_tx, latest_rx) = tokio::sync::oneshot::channel();
2488 let mut latest_tx = Some(latest_tx);
2489
2490 let manifests = locations
2491 .map_ok(move |location| {
2492 let latest_tx = latest_tx.take();
2493 async move {
2494 let manifest_key = ManifestKey {
2495 version: location.version,
2496 e_tag: location.e_tag.as_deref(),
2497 };
2498 let manifest = if let Some(cached) =
2499 dataset.metadata_cache.get_with_key(&manifest_key).await
2500 {
2501 cached
2502 } else {
2503 let loaded = Arc::new(
2504 Dataset::load_manifest(
2505 dataset.object_store(),
2506 &location,
2507 &dataset.uri,
2508 dataset.session.as_ref(),
2509 )
2510 .await?,
2511 );
2512 dataset
2513 .metadata_cache
2514 .insert_with_key(&manifest_key, loaded.clone())
2515 .await;
2516 loaded
2517 };
2518
2519 if let Some(latest_tx) = latest_tx {
2520 let _ = latest_tx.send((manifest.clone(), location.clone()));
2522 }
2523
2524 Ok((manifest, location))
2525 }
2526 })
2527 .try_buffer_unordered(io_parallelism / 2);
2528 let transactions = manifests
2529 .map_ok(move |(manifest, location)| async move {
2530 let manifest_copy = manifest.clone();
2531 let tx_key = TransactionKey {
2532 version: manifest.version,
2533 };
2534 let transaction =
2535 if let Some(cached) = dataset.metadata_cache.get_with_key(&tx_key).await {
2536 cached
2537 } else {
2538 let dataset_version = Dataset::checkout_manifest(
2539 dataset.object_store.clone(),
2540 dataset.base.clone(),
2541 dataset.uri.clone(),
2542 manifest_copy.clone(),
2543 location,
2544 dataset.session(),
2545 dataset.commit_handler.clone(),
2546 dataset.file_reader_options.clone(),
2547 dataset.store_params.as_deref().cloned(),
2548 )?;
2549 let loaded =
2550 Arc::new(dataset_version.read_transaction().await?.ok_or_else(|| {
2551 Error::Internal {
2552 message: format!(
2553 "Dataset version {} does not have a transaction file",
2554 manifest_copy.version
2555 ),
2556 location: location!(),
2557 }
2558 })?);
2559 dataset
2560 .metadata_cache
2561 .insert_with_key(&tx_key, loaded.clone())
2562 .await;
2563 loaded
2564 };
2565 Ok((manifest.version, transaction))
2566 })
2567 .try_buffer_unordered(io_parallelism / 2);
2568
2569 let dataset = async move {
2570 if let Ok((latest_manifest, location)) = latest_rx.await {
2571 Dataset::checkout_manifest(
2573 dataset.object_store.clone(),
2574 dataset.base.clone(),
2575 dataset.uri.clone(),
2576 latest_manifest,
2577 location,
2578 dataset.session(),
2579 dataset.commit_handler.clone(),
2580 dataset.file_reader_options.clone(),
2581 dataset.store_params.as_deref().cloned(),
2582 )
2583 } else {
2584 Ok(dataset.clone())
2587 }
2588 }
2589 .boxed();
2590
2591 let new_transactions = transactions.boxed();
2592
2593 NewTransactionResult {
2594 dataset,
2595 new_transactions,
2596 }
2597}
2598
2599impl Dataset {
2616 pub async fn add_columns(
2618 &mut self,
2619 transforms: NewColumnTransform,
2620 read_columns: Option<Vec<String>>,
2621 batch_size: Option<u32>,
2622 ) -> Result<()> {
2623 schema_evolution::add_columns(self, transforms, read_columns, batch_size).await
2624 }
2625
2626 pub async fn alter_columns(&mut self, alterations: &[ColumnAlteration]) -> Result<()> {
2635 schema_evolution::alter_columns(self, alterations).await
2636 }
2637
2638 pub async fn drop_columns(&mut self, columns: &[&str]) -> Result<()> {
2645 info!(target: TRACE_DATASET_EVENTS, event=DATASET_DROPPING_COLUMN_EVENT, uri = &self.uri, columns = columns.join(","));
2646 schema_evolution::drop_columns(self, columns).await
2647 }
2648
2649 #[deprecated(since = "0.9.12", note = "Please use `drop_columns` instead.")]
2655 pub async fn drop(&mut self, columns: &[&str]) -> Result<()> {
2656 self.drop_columns(columns).await
2657 }
2658
2659 async fn merge_impl(
2660 &mut self,
2661 stream: Box<dyn RecordBatchReader + Send>,
2662 left_on: &str,
2663 right_on: &str,
2664 ) -> Result<()> {
2665 if self.schema().field(left_on).is_none() && left_on != ROW_ID && left_on != ROW_ADDR {
2667 return Err(Error::invalid_input(
2668 format!("Column {} does not exist in the left side dataset", left_on),
2669 location!(),
2670 ));
2671 };
2672 let right_schema = stream.schema();
2673 if right_schema.field_with_name(right_on).is_err() {
2674 return Err(Error::invalid_input(
2675 format!(
2676 "Column {} does not exist in the right side dataset",
2677 right_on
2678 ),
2679 location!(),
2680 ));
2681 };
2682 for field in right_schema.fields() {
2683 if field.name() == right_on {
2684 continue;
2687 }
2688 if self.schema().field(field.name()).is_some() {
2689 return Err(Error::invalid_input(
2690 format!(
2691 "Column {} exists in both sides of the dataset",
2692 field.name()
2693 ),
2694 location!(),
2695 ));
2696 }
2697 }
2698
2699 let joiner = Arc::new(HashJoiner::try_new(stream, right_on).await?);
2701 let mut new_schema: Schema = self.schema().merge(joiner.out_schema().as_ref())?;
2704 new_schema.set_field_id(Some(self.manifest.max_field_id()));
2705
2706 let updated_fragments: Vec<Fragment> = stream::iter(self.get_fragments())
2709 .then(|f| {
2710 let joiner = joiner.clone();
2711 async move { f.merge(left_on, &joiner).await.map(|f| f.metadata) }
2712 })
2713 .try_collect::<Vec<_>>()
2714 .await?;
2715
2716 let transaction = Transaction::new(
2717 self.manifest.version,
2718 Operation::Merge {
2719 fragments: updated_fragments,
2720 schema: new_schema,
2721 },
2722 None,
2723 );
2724
2725 self.apply_commit(transaction, &Default::default(), &Default::default())
2726 .await?;
2727
2728 Ok(())
2729 }
2730
2731 pub async fn merge(
2743 &mut self,
2744 stream: impl RecordBatchReader + Send + 'static,
2745 left_on: &str,
2746 right_on: &str,
2747 ) -> Result<()> {
2748 let stream = Box::new(stream);
2749 self.merge_impl(stream, left_on, right_on).await
2750 }
2751}
2752
2753impl Dataset {
2766 pub fn metadata(&self) -> &HashMap<String, String> {
2768 &self.manifest.table_metadata
2769 }
2770
2771 pub fn config(&self) -> &HashMap<String, String> {
2773 &self.manifest.config
2774 }
2775
2776 pub(crate) fn blob_version(&self) -> BlobVersion {
2777 blob_version_from_config(&self.manifest.config)
2778 }
2779
2780 #[deprecated(
2782 note = "Use the new update_config(values, replace) method - pass None values to delete keys"
2783 )]
2784 pub async fn delete_config_keys(&mut self, delete_keys: &[&str]) -> Result<()> {
2785 let updates = delete_keys.iter().map(|key| (*key, None));
2786 self.update_config(updates).await?;
2787 Ok(())
2788 }
2789
2790 pub fn update_metadata(
2817 &mut self,
2818 values: impl IntoIterator<Item = impl Into<UpdateMapEntry>>,
2819 ) -> metadata::UpdateMetadataBuilder<'_> {
2820 metadata::UpdateMetadataBuilder::new(self, values, metadata::MetadataType::TableMetadata)
2821 }
2822
2823 pub fn update_config(
2850 &mut self,
2851 values: impl IntoIterator<Item = impl Into<UpdateMapEntry>>,
2852 ) -> metadata::UpdateMetadataBuilder<'_> {
2853 metadata::UpdateMetadataBuilder::new(self, values, metadata::MetadataType::Config)
2854 }
2855
2856 pub fn update_schema_metadata(
2883 &mut self,
2884 values: impl IntoIterator<Item = impl Into<UpdateMapEntry>>,
2885 ) -> metadata::UpdateMetadataBuilder<'_> {
2886 metadata::UpdateMetadataBuilder::new(self, values, metadata::MetadataType::SchemaMetadata)
2887 }
2888
2889 #[deprecated(note = "Use the new update_schema_metadata(values).replace() instead")]
2891 pub async fn replace_schema_metadata(
2892 &mut self,
2893 new_values: impl IntoIterator<Item = (String, String)>,
2894 ) -> Result<()> {
2895 let new_values = new_values
2896 .into_iter()
2897 .map(|(k, v)| (k, Some(v)))
2898 .collect::<HashMap<_, _>>();
2899 self.update_schema_metadata(new_values).replace().await?;
2900 Ok(())
2901 }
2902
2903 pub fn update_field_metadata(&mut self) -> UpdateFieldMetadataBuilder<'_> {
2933 UpdateFieldMetadataBuilder::new(self)
2934 }
2935
2936 pub async fn replace_field_metadata(
2938 &mut self,
2939 new_values: impl IntoIterator<Item = (u32, HashMap<String, String>)>,
2940 ) -> Result<()> {
2941 let new_values = new_values.into_iter().collect::<HashMap<_, _>>();
2942 let field_metadata_updates = new_values
2943 .into_iter()
2944 .map(|(field_id, metadata)| {
2945 (
2946 field_id as i32,
2947 translate_schema_metadata_updates(&metadata),
2948 )
2949 })
2950 .collect();
2951 metadata::execute_metadata_update(
2952 self,
2953 Operation::UpdateConfig {
2954 config_updates: None,
2955 table_metadata_updates: None,
2956 schema_metadata_updates: None,
2957 field_metadata_updates,
2958 },
2959 )
2960 .await
2961 }
2962}
2963
2964#[async_trait::async_trait]
2965impl DatasetTakeRows for Dataset {
2966 fn schema(&self) -> &Schema {
2967 Self::schema(self)
2968 }
2969
2970 async fn take_rows(&self, row_ids: &[u64], projection: &Schema) -> Result<RecordBatch> {
2971 Self::take_rows(self, row_ids, projection.clone()).await
2972 }
2973}
2974
2975#[derive(Debug)]
2976pub(crate) struct ManifestWriteConfig {
2977 auto_set_feature_flags: bool, timestamp: Option<SystemTime>, use_stable_row_ids: bool, use_legacy_format: Option<bool>, storage_format: Option<DataStorageFormat>, disable_transaction_file: bool, }
2984
2985impl Default for ManifestWriteConfig {
2986 fn default() -> Self {
2987 Self {
2988 auto_set_feature_flags: true,
2989 timestamp: None,
2990 use_stable_row_ids: false,
2991 disable_transaction_file: false,
2992 use_legacy_format: None,
2993 storage_format: None,
2994 }
2995 }
2996}
2997
2998impl ManifestWriteConfig {
2999 pub fn disable_transaction_file(&self) -> bool {
3000 self.disable_transaction_file
3001 }
3002}
3003
3004#[allow(clippy::too_many_arguments)]
3006pub(crate) async fn write_manifest_file(
3007 object_store: &ObjectStore,
3008 commit_handler: &dyn CommitHandler,
3009 base_path: &Path,
3010 manifest: &mut Manifest,
3011 indices: Option<Vec<IndexMetadata>>,
3012 config: &ManifestWriteConfig,
3013 naming_scheme: ManifestNamingScheme,
3014 mut transaction: Option<&Transaction>,
3015) -> std::result::Result<ManifestLocation, CommitError> {
3016 if config.auto_set_feature_flags {
3017 apply_feature_flags(
3018 manifest,
3019 config.use_stable_row_ids,
3020 config.disable_transaction_file,
3021 )?;
3022 }
3023
3024 manifest.set_timestamp(timestamp_to_nanos(config.timestamp));
3025
3026 manifest.update_max_fragment_id();
3027
3028 commit_handler
3029 .commit(
3030 manifest,
3031 indices,
3032 base_path,
3033 object_store,
3034 write_manifest_file_to_path,
3035 naming_scheme,
3036 transaction.take().map(|tx| tx.into()),
3037 )
3038 .await
3039}
3040
3041impl Projectable for Dataset {
3042 fn schema(&self) -> &Schema {
3043 self.schema()
3044 }
3045}
3046
3047#[cfg(test)]
3048mod tests;