1use arrow_array::{RecordBatch, RecordBatchReader};
8use arrow_schema::DataType;
9use byteorder::{ByteOrder, LittleEndian};
10use chrono::{Duration, prelude::*};
11use deepsize::DeepSizeOf;
12use futures::future::BoxFuture;
13use futures::stream::{self, BoxStream, StreamExt, TryStreamExt};
14use futures::{FutureExt, Stream};
15
16use crate::dataset::metadata::UpdateFieldMetadataBuilder;
17use crate::dataset::transaction::translate_schema_metadata_updates;
18use crate::session::caches::{DSMetadataCache, ManifestKey, TransactionKey};
19use crate::session::index_caches::DSIndexCache;
20use itertools::Itertools;
21use lance_core::ROW_ADDR;
22use lance_core::datatypes::{OnMissing, OnTypeMismatch, Projectable, Projection};
23use lance_core::traits::DatasetTakeRows;
24use lance_core::utils::address::RowAddress;
25use lance_core::utils::tracing::{
26 DATASET_CLEANING_EVENT, DATASET_DELETING_EVENT, DATASET_DROPPING_COLUMN_EVENT,
27 TRACE_DATASET_EVENTS,
28};
29use lance_datafusion::projection::ProjectionPlan;
30use lance_file::datatypes::populate_schema_dictionary;
31use lance_file::reader::FileReaderOptions;
32use lance_file::version::LanceFileVersion;
33use lance_index::{DatasetIndexExt, IndexType};
34use lance_io::object_store::{
35 LanceNamespaceStorageOptionsProvider, ObjectStore, ObjectStoreParams, StorageOptions,
36 StorageOptionsAccessor, StorageOptionsProvider,
37};
38use lance_io::utils::{read_last_block, read_message, read_metadata_offset, read_struct};
39use lance_namespace::LanceNamespace;
40use lance_table::format::{
41 DataFile, DataStorageFormat, DeletionFile, Fragment, IndexMetadata, Manifest, RowIdMeta, pb,
42};
43use lance_table::io::commit::{
44 CommitConfig, CommitError, CommitHandler, CommitLock, ManifestLocation, ManifestNamingScheme,
45 VERSIONS_DIR, external_manifest::ExternalManifestCommitHandler, migrate_scheme_to_v2,
46 write_manifest_file_to_path,
47};
48
49use crate::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
50use lance_table::io::manifest::{read_manifest, read_manifest_indexes};
51use object_store::path::Path;
52use prost::Message;
53use roaring::RoaringBitmap;
54use rowids::get_row_id_index;
55use serde::{Deserialize, Serialize};
56use std::borrow::Cow;
57use std::collections::{BTreeMap, HashMap, HashSet};
58use std::fmt::Debug;
59use std::ops::Range;
60use std::pin::Pin;
61use std::sync::Arc;
62use take::row_offsets_to_row_addresses;
63use tracing::{info, instrument};
64
65pub(crate) mod blob;
66mod branch_location;
67pub mod builder;
68pub mod cleanup;
69pub mod delta;
70pub mod fragment;
71mod hash_joiner;
72pub mod index;
73pub mod mem_wal;
74mod metadata;
75pub mod optimize;
76pub mod progress;
77pub mod refs;
78pub(crate) mod rowids;
79pub mod scanner;
80mod schema_evolution;
81pub mod sql;
82pub mod statistics;
83mod take;
84pub mod transaction;
85pub mod udtf;
86pub mod updater;
87mod utils;
88pub mod write;
89
90use self::builder::DatasetBuilder;
91use self::cleanup::RemovalStats;
92use self::fragment::FileFragment;
93use self::refs::Refs;
94use self::scanner::{DatasetRecordBatchStream, Scanner};
95use self::transaction::{Operation, Transaction, TransactionBuilder, UpdateMapEntry};
96use self::write::write_fragments_internal;
97use crate::dataset::branch_location::BranchLocation;
98use crate::dataset::cleanup::{CleanupPolicy, CleanupPolicyBuilder};
99use crate::dataset::refs::{BranchContents, BranchIdentifier, Branches, Tags};
100use crate::dataset::sql::SqlQueryBuilder;
101use crate::datatypes::Schema;
102use crate::index::retain_supported_indices;
103use crate::io::commit::{
104 commit_detached_transaction, commit_new_dataset, commit_transaction,
105 detect_overlapping_fragments,
106};
107use crate::session::Session;
108use crate::utils::temporal::{SystemTime, timestamp_to_nanos, utc_now};
109use crate::{Error, Result};
110pub use blob::BlobFile;
111use hash_joiner::HashJoiner;
112pub use lance_core::ROW_ID;
113use lance_core::box_error;
114use lance_index::scalar::lance_format::LanceIndexStore;
115use lance_namespace::models::{DeclareTableRequest, DescribeTableRequest};
116use lance_table::feature_flags::{apply_feature_flags, can_read_dataset};
117use lance_table::io::deletion::{DELETIONS_DIR, relative_deletion_file_path};
118pub use schema_evolution::{
119 BatchInfo, BatchUDF, ColumnAlteration, NewColumnTransform, UDFCheckpointStore,
120};
121pub use take::TakeBuilder;
122pub use write::merge_insert::{
123 MergeInsertBuilder, MergeInsertJob, MergeStats, UncommittedMergeInsert, WhenMatched,
124 WhenNotMatched, WhenNotMatchedBySource,
125};
126
127use crate::dataset::index::LanceIndexStoreExt;
128pub use write::update::{UpdateBuilder, UpdateJob};
129#[allow(deprecated)]
130pub use write::{
131 AutoCleanupParams, CommitBuilder, DeleteBuilder, DeleteResult, InsertBuilder, WriteDestination,
132 WriteMode, WriteParams, write_fragments,
133};
134
135pub(crate) const INDICES_DIR: &str = "_indices";
136pub(crate) const DATA_DIR: &str = "data";
137pub(crate) const TRANSACTIONS_DIR: &str = "_transactions";
138
139pub const DEFAULT_INDEX_CACHE_SIZE: usize = 6 * 1024 * 1024 * 1024;
142pub const DEFAULT_METADATA_CACHE_SIZE: usize = 1024 * 1024 * 1024;
146
147#[derive(Clone)]
149pub struct Dataset {
150 pub object_store: Arc<ObjectStore>,
151 pub(crate) commit_handler: Arc<dyn CommitHandler>,
152 uri: String,
157 pub(crate) base: Path,
158 pub manifest: Arc<Manifest>,
159 pub(crate) manifest_location: ManifestLocation,
162 pub(crate) session: Arc<Session>,
163 pub refs: Refs,
164
165 pub(crate) fragment_bitmap: Arc<RoaringBitmap>,
167
168 pub(crate) index_cache: Arc<DSIndexCache>,
170 pub(crate) metadata_cache: Arc<DSMetadataCache>,
171
172 pub(crate) file_reader_options: Option<FileReaderOptions>,
174
175 pub(crate) store_params: Option<Box<ObjectStoreParams>>,
178}
179
180impl std::fmt::Debug for Dataset {
181 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182 f.debug_struct("Dataset")
183 .field("uri", &self.uri)
184 .field("base", &self.base)
185 .field("version", &self.manifest.version)
186 .field("cache_num_items", &self.session.approx_num_items())
187 .finish()
188 }
189}
190
191#[derive(Deserialize, Serialize, Debug)]
193pub struct Version {
194 pub version: u64,
196
197 pub timestamp: DateTime<Utc>,
199
200 pub metadata: BTreeMap<String, String>,
202}
203
204impl From<&Manifest> for Version {
206 fn from(m: &Manifest) -> Self {
207 Self {
208 version: m.version,
209 timestamp: m.timestamp(),
210 metadata: m.summary().into(),
211 }
212 }
213}
214
215#[derive(Clone, Debug)]
217pub struct ReadParams {
218 pub index_cache_size_bytes: usize,
221
222 pub metadata_cache_size_bytes: usize,
225
226 pub session: Option<Arc<Session>>,
230
231 pub store_options: Option<ObjectStoreParams>,
232
233 pub commit_handler: Option<Arc<dyn CommitHandler>>,
250
251 pub file_reader_options: Option<FileReaderOptions>,
255}
256
257impl ReadParams {
258 #[deprecated(
260 since = "0.30.0",
261 note = "Use `index_cache_size_bytes` instead, which accepts a size in bytes."
262 )]
263 pub fn index_cache_size(&mut self, cache_size: usize) -> &mut Self {
264 let assumed_entry_size = 20 * 1024 * 1024; self.index_cache_size_bytes = cache_size * assumed_entry_size;
266 self
267 }
268
269 pub fn index_cache_size_bytes(&mut self, cache_size: usize) -> &mut Self {
270 self.index_cache_size_bytes = cache_size;
271 self
272 }
273
274 #[deprecated(
276 since = "0.30.0",
277 note = "Use `metadata_cache_size_bytes` instead, which accepts a size in bytes."
278 )]
279 pub fn metadata_cache_size(&mut self, cache_size: usize) -> &mut Self {
280 let assumed_entry_size = 10 * 1024 * 1024; self.metadata_cache_size_bytes = cache_size * assumed_entry_size;
282 self
283 }
284
285 pub fn metadata_cache_size_bytes(&mut self, cache_size: usize) -> &mut Self {
287 self.metadata_cache_size_bytes = cache_size;
288 self
289 }
290
291 pub fn session(&mut self, session: Arc<Session>) -> &mut Self {
293 self.session = Some(session);
294 self
295 }
296
297 pub fn set_commit_lock<T: CommitLock + Send + Sync + 'static>(&mut self, lock: Arc<T>) {
299 self.commit_handler = Some(Arc::new(lock));
300 }
301
302 pub fn file_reader_options(&mut self, options: FileReaderOptions) -> &mut Self {
304 self.file_reader_options = Some(options);
305 self
306 }
307}
308
309impl Default for ReadParams {
310 fn default() -> Self {
311 Self {
312 index_cache_size_bytes: DEFAULT_INDEX_CACHE_SIZE,
313 metadata_cache_size_bytes: DEFAULT_METADATA_CACHE_SIZE,
314 session: None,
315 store_options: None,
316 commit_handler: None,
317 file_reader_options: None,
318 }
319 }
320}
321
322#[derive(Debug, Clone)]
323pub enum ProjectionRequest {
324 Schema(Arc<Schema>),
325 Sql(Vec<(String, String)>),
326}
327
328impl ProjectionRequest {
329 pub fn from_columns(
330 columns: impl IntoIterator<Item = impl AsRef<str>>,
331 dataset_schema: &Schema,
332 ) -> Self {
333 let columns = columns
334 .into_iter()
335 .map(|s| s.as_ref().to_string())
336 .collect::<Vec<_>>();
337
338 let schema = dataset_schema
339 .project_preserve_system_columns(&columns)
340 .unwrap();
341 Self::Schema(Arc::new(schema))
342 }
343
344 pub fn from_schema(schema: Schema) -> Self {
345 Self::Schema(Arc::new(schema))
346 }
347
348 pub fn from_sql(
354 columns: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
355 ) -> Self {
356 Self::Sql(
357 columns
358 .into_iter()
359 .map(|(a, b)| (a.into(), b.into()))
360 .collect(),
361 )
362 }
363
364 pub fn into_projection_plan(self, dataset: Arc<Dataset>) -> Result<ProjectionPlan> {
365 match self {
366 Self::Schema(schema) => {
367 let system_columns_present = schema
370 .fields
371 .iter()
372 .any(|f| lance_core::is_system_column(&f.name));
373
374 if system_columns_present {
375 ProjectionPlan::from_schema(dataset, schema.as_ref())
378 } else {
379 let projection = dataset.schema().project_by_schema(
381 schema.as_ref(),
382 OnMissing::Error,
383 OnTypeMismatch::Error,
384 )?;
385 ProjectionPlan::from_schema(dataset, &projection)
386 }
387 }
388 Self::Sql(columns) => ProjectionPlan::from_expressions(dataset, &columns),
389 }
390 }
391}
392
393impl From<Arc<Schema>> for ProjectionRequest {
394 fn from(schema: Arc<Schema>) -> Self {
395 Self::Schema(schema)
396 }
397}
398
399impl From<Schema> for ProjectionRequest {
400 fn from(schema: Schema) -> Self {
401 Self::from(Arc::new(schema))
402 }
403}
404
405impl Dataset {
406 #[instrument]
410 pub async fn open(uri: &str) -> Result<Self> {
411 DatasetBuilder::from_uri(uri).load().await
412 }
413
414 pub async fn checkout_version(&self, version: impl Into<refs::Ref>) -> Result<Self> {
416 let reference: refs::Ref = version.into();
417 match reference {
418 refs::Ref::Version(branch, version_number) => {
419 self.checkout_by_ref(version_number, branch.as_deref())
420 .await
421 }
422 refs::Ref::VersionNumber(version_number) => {
423 self.checkout_by_ref(Some(version_number), self.manifest.branch.as_deref())
424 .await
425 }
426 refs::Ref::Tag(tag_name) => {
427 let tag_contents = self.tags().get(tag_name.as_str()).await?;
428 self.checkout_by_ref(Some(tag_contents.version), tag_contents.branch.as_deref())
429 .await
430 }
431 }
432 }
433
434 pub fn tags(&self) -> Tags<'_> {
435 self.refs.tags()
436 }
437
438 pub fn branches(&self) -> Branches<'_> {
439 self.refs.branches()
440 }
441
442 pub async fn checkout_latest(&mut self) -> Result<()> {
444 let (manifest, manifest_location) = self.latest_manifest().await?;
445 self.manifest = manifest;
446 self.manifest_location = manifest_location;
447 self.fragment_bitmap = Arc::new(
448 self.manifest
449 .fragments
450 .iter()
451 .map(|f| f.id as u32)
452 .collect(),
453 );
454 Ok(())
455 }
456
457 pub async fn checkout_branch(&self, branch: &str) -> Result<Self> {
459 self.checkout_by_ref(None, Some(branch)).await
460 }
461
462 pub async fn create_branch(
478 &mut self,
479 branch: &str,
480 version: impl Into<refs::Ref>,
481 store_params: Option<ObjectStoreParams>,
482 ) -> Result<Self> {
483 let (source_branch, version_number) = self.resolve_reference(version.into()).await?;
484 let branch_location = self.branch_location().find_branch(Some(branch))?;
485 let clone_op = Operation::Clone {
486 is_shallow: true,
487 ref_name: source_branch.clone(),
488 ref_version: version_number,
489 ref_path: String::from(self.uri()),
490 branch_name: Some(branch.to_string()),
491 };
492 let transaction = Transaction::new(version_number, clone_op, None);
493
494 let builder = CommitBuilder::new(WriteDestination::Uri(branch_location.uri.as_str()))
495 .with_store_params(store_params.unwrap_or_default())
496 .with_object_store(Arc::new(self.object_store().clone()))
497 .with_commit_handler(self.commit_handler.clone())
498 .with_storage_format(self.manifest.data_storage_format.lance_file_version()?);
499 let dataset = builder.execute(transaction).await?;
500
501 self.branches()
503 .create(branch, version_number, source_branch.as_deref())
504 .await?;
505 Ok(dataset)
506 }
507
508 pub async fn delete_branch(&mut self, branch: &str) -> Result<()> {
509 self.branches().delete(branch, false).await
510 }
511
512 pub async fn force_delete_branch(&mut self, branch: &str) -> Result<()> {
515 self.branches().delete(branch, true).await
516 }
517
518 pub async fn list_branches(&self) -> Result<HashMap<String, BranchContents>> {
519 self.branches().list().await
520 }
521
522 fn already_checked_out(&self, location: &ManifestLocation, branch_name: Option<&str>) -> bool {
523 self.manifest.branch.as_deref() == branch_name
526 && self.manifest.version == location.version
527 && self.manifest_location.naming_scheme == location.naming_scheme
528 && location.e_tag.as_ref().is_some_and(|e_tag| {
529 self.manifest_location
530 .e_tag
531 .as_ref()
532 .is_some_and(|current_e_tag| e_tag == current_e_tag)
533 })
534 }
535
536 async fn checkout_by_ref(
537 &self,
538 version_number: Option<u64>,
539 branch: Option<&str>,
540 ) -> Result<Self> {
541 let new_location = self.branch_location().find_branch(branch)?;
542
543 let manifest_location = if let Some(version_number) = version_number {
544 self.commit_handler
545 .resolve_version_location(
546 &new_location.path,
547 version_number,
548 &self.object_store.inner,
549 )
550 .await?
551 } else {
552 self.commit_handler
553 .resolve_latest_location(&new_location.path, &self.object_store)
554 .await?
555 };
556
557 if self.already_checked_out(&manifest_location, branch) {
558 return Ok(self.clone());
559 }
560
561 let manifest = Self::load_manifest(
562 self.object_store.as_ref(),
563 &manifest_location,
564 &new_location.uri,
565 self.session.as_ref(),
566 )
567 .await?;
568 Self::checkout_manifest(
569 self.object_store.clone(),
570 new_location.path,
571 new_location.uri,
572 Arc::new(manifest),
573 manifest_location,
574 self.session.clone(),
575 self.commit_handler.clone(),
576 self.file_reader_options.clone(),
577 self.store_params.as_deref().cloned(),
578 )
579 }
580
581 pub(crate) async fn load_manifest(
582 object_store: &ObjectStore,
583 manifest_location: &ManifestLocation,
584 uri: &str,
585 session: &Session,
586 ) -> Result<Manifest> {
587 let object_reader = if let Some(size) = manifest_location.size {
588 object_store
589 .open_with_size(&manifest_location.path, size as usize)
590 .await
591 } else {
592 object_store.open(&manifest_location.path).await
593 };
594 let object_reader = object_reader.map_err(|e| match &e {
595 Error::NotFound { uri, .. } => Error::dataset_not_found(uri.clone(), box_error(e)),
596 _ => e,
597 })?;
598
599 let last_block =
600 read_last_block(object_reader.as_ref())
601 .await
602 .map_err(|err| match err {
603 object_store::Error::NotFound { path, source } => {
604 Error::dataset_not_found(path, source)
605 }
606 _ => Error::io_source(err.into()),
607 })?;
608 let offset = read_metadata_offset(&last_block)?;
609
610 let manifest_size = object_reader.size().await?;
612 let mut manifest = if manifest_size - offset <= last_block.len() {
613 let manifest_len = manifest_size - offset;
614 let offset_in_block = last_block.len() - manifest_len;
615 let message_len =
616 LittleEndian::read_u32(&last_block[offset_in_block..offset_in_block + 4]) as usize;
617 let message_data = &last_block[offset_in_block + 4..offset_in_block + 4 + message_len];
618 Manifest::try_from(lance_table::format::pb::Manifest::decode(message_data)?)
619 } else {
620 read_struct(object_reader.as_ref(), offset).await
621 }?;
622
623 if !can_read_dataset(manifest.reader_feature_flags) {
624 let message = format!(
625 "This dataset cannot be read by this version of Lance. \
626 Please upgrade Lance to read this dataset.\n Flags: {}",
627 manifest.reader_feature_flags
628 );
629 return Err(Error::not_supported_source(message.into()));
630 }
631
632 if let Some(index_offset) = manifest.index_section
635 && manifest_size - index_offset <= last_block.len()
636 {
637 let offset_in_block = last_block.len() - (manifest_size - index_offset);
638 let message_len =
639 LittleEndian::read_u32(&last_block[offset_in_block..offset_in_block + 4]) as usize;
640 let message_data = &last_block[offset_in_block + 4..offset_in_block + 4 + message_len];
641 let section = lance_table::format::pb::IndexSection::decode(message_data)?;
642 let mut indices: Vec<IndexMetadata> = section
643 .indices
644 .into_iter()
645 .map(IndexMetadata::try_from)
646 .collect::<Result<Vec<_>>>()?;
647 retain_supported_indices(&mut indices);
648 let ds_index_cache = session.index_cache.for_dataset(uri);
649 let metadata_key = crate::session::index_caches::IndexMetadataKey {
650 version: manifest_location.version,
651 };
652 ds_index_cache
653 .insert_with_key(&metadata_key, Arc::new(indices))
654 .await;
655 }
656
657 if let Some(transaction_offset) = manifest.transaction_section
660 && manifest_size - transaction_offset <= last_block.len()
661 {
662 let offset_in_block = last_block.len() - (manifest_size - transaction_offset);
663 let message_len =
664 LittleEndian::read_u32(&last_block[offset_in_block..offset_in_block + 4]) as usize;
665 let message_data = &last_block[offset_in_block + 4..offset_in_block + 4 + message_len];
666 let transaction: Transaction =
667 lance_table::format::pb::Transaction::decode(message_data)?.try_into()?;
668
669 let metadata_cache = session.metadata_cache.for_dataset(uri);
670 let metadata_key = TransactionKey {
671 version: manifest_location.version,
672 };
673 metadata_cache
674 .insert_with_key(&metadata_key, Arc::new(transaction))
675 .await;
676 }
677
678 if manifest.should_use_legacy_format() {
679 populate_schema_dictionary(&mut manifest.schema, object_reader.as_ref()).await?;
680 }
681
682 Ok(manifest)
683 }
684
685 #[allow(clippy::too_many_arguments)]
686 fn checkout_manifest(
687 object_store: Arc<ObjectStore>,
688 base_path: Path,
689 uri: String,
690 manifest: Arc<Manifest>,
691 manifest_location: ManifestLocation,
692 session: Arc<Session>,
693 commit_handler: Arc<dyn CommitHandler>,
694 file_reader_options: Option<FileReaderOptions>,
695 store_params: Option<ObjectStoreParams>,
696 ) -> Result<Self> {
697 let refs = Refs::new(
698 object_store.clone(),
699 commit_handler.clone(),
700 BranchLocation {
701 path: base_path.clone(),
702 uri: uri.clone(),
703 branch: manifest.branch.clone(),
704 },
705 );
706 let metadata_cache = Arc::new(session.metadata_cache.for_dataset(&uri));
707 let index_cache = Arc::new(session.index_cache.for_dataset(&uri));
708 let fragment_bitmap = Arc::new(manifest.fragments.iter().map(|f| f.id as u32).collect());
709 Ok(Self {
710 object_store,
711 base: base_path,
712 uri,
713 manifest,
714 manifest_location,
715 commit_handler,
716 session,
717 refs,
718 fragment_bitmap,
719 metadata_cache,
720 index_cache,
721 file_reader_options,
722 store_params: store_params.map(Box::new),
723 })
724 }
725
726 pub async fn write(
734 batches: impl RecordBatchReader + Send + 'static,
735 dest: impl Into<WriteDestination<'_>>,
736 params: Option<WriteParams>,
737 ) -> Result<Self> {
738 let mut builder = InsertBuilder::new(dest);
739 if let Some(params) = ¶ms {
740 builder = builder.with_params(params);
741 }
742 Box::pin(builder.execute_stream(Box::new(batches) as Box<dyn RecordBatchReader + Send>))
743 .await
744 }
745
746 pub async fn write_into_namespace(
758 batches: impl RecordBatchReader + Send + 'static,
759 namespace: Arc<dyn LanceNamespace>,
760 table_id: Vec<String>,
761 mut params: Option<WriteParams>,
762 ) -> Result<Self> {
763 let mut write_params = params.take().unwrap_or_default();
764
765 match write_params.mode {
766 WriteMode::Create => {
767 let declare_request = DeclareTableRequest {
768 id: Some(table_id.clone()),
769 ..Default::default()
770 };
771 let response = namespace
772 .declare_table(declare_request)
773 .await
774 .map_err(|e| Error::namespace_source(Box::new(e)))?;
775
776 let uri = response.location.ok_or_else(|| {
777 Error::namespace_source(Box::new(std::io::Error::other(
778 "Table location not found in declare_table response",
779 )))
780 })?;
781
782 if response.managed_versioning == Some(true) {
784 let external_store = LanceNamespaceExternalManifestStore::new(
785 namespace.clone(),
786 table_id.clone(),
787 );
788 let commit_handler: Arc<dyn CommitHandler> =
789 Arc::new(ExternalManifestCommitHandler {
790 external_manifest_store: Arc::new(external_store),
791 });
792 write_params.commit_handler = Some(commit_handler);
793 }
794
795 if let Some(namespace_storage_options) = response.storage_options {
797 let provider: Arc<dyn StorageOptionsProvider> = Arc::new(
798 LanceNamespaceStorageOptionsProvider::new(namespace, table_id),
799 );
800
801 let mut merged_options = write_params
803 .store_params
804 .as_ref()
805 .and_then(|p| p.storage_options().cloned())
806 .unwrap_or_default();
807 merged_options.extend(namespace_storage_options);
808
809 let accessor = Arc::new(StorageOptionsAccessor::with_initial_and_provider(
810 merged_options,
811 provider,
812 ));
813
814 let existing_params = write_params.store_params.take().unwrap_or_default();
815 write_params.store_params = Some(ObjectStoreParams {
816 storage_options_accessor: Some(accessor),
817 ..existing_params
818 });
819 }
820
821 Self::write(batches, uri.as_str(), Some(write_params)).await
822 }
823 WriteMode::Append | WriteMode::Overwrite => {
824 let request = DescribeTableRequest {
825 id: Some(table_id.clone()),
826 ..Default::default()
827 };
828 let response = namespace
829 .describe_table(request)
830 .await
831 .map_err(|e| Error::namespace_source(Box::new(e)))?;
832
833 let uri = response.location.ok_or_else(|| {
834 Error::namespace_source(Box::new(std::io::Error::other(
835 "Table location not found in describe_table response",
836 )))
837 })?;
838
839 if response.managed_versioning == Some(true) {
841 let external_store = LanceNamespaceExternalManifestStore::new(
842 namespace.clone(),
843 table_id.clone(),
844 );
845 let commit_handler: Arc<dyn CommitHandler> =
846 Arc::new(ExternalManifestCommitHandler {
847 external_manifest_store: Arc::new(external_store),
848 });
849 write_params.commit_handler = Some(commit_handler);
850 }
851
852 if let Some(namespace_storage_options) = response.storage_options {
854 let provider: Arc<dyn StorageOptionsProvider> =
855 Arc::new(LanceNamespaceStorageOptionsProvider::new(
856 namespace.clone(),
857 table_id.clone(),
858 ));
859
860 let mut merged_options = write_params
862 .store_params
863 .as_ref()
864 .and_then(|p| p.storage_options().cloned())
865 .unwrap_or_default();
866 merged_options.extend(namespace_storage_options);
867
868 let accessor = Arc::new(StorageOptionsAccessor::with_initial_and_provider(
869 merged_options,
870 provider,
871 ));
872
873 let existing_params = write_params.store_params.take().unwrap_or_default();
874 write_params.store_params = Some(ObjectStoreParams {
875 storage_options_accessor: Some(accessor),
876 ..existing_params
877 });
878 }
879
880 let mut builder = DatasetBuilder::from_uri(uri.as_str());
884 if let Some(ref store_params) = write_params.store_params
885 && let Some(accessor) = &store_params.storage_options_accessor
886 {
887 builder = builder.with_storage_options_accessor(accessor.clone());
888 }
889 let dataset = Arc::new(builder.load().await?);
890
891 Self::write(batches, dataset, Some(write_params)).await
892 }
893 }
894 }
895
896 pub async fn append(
900 &mut self,
901 batches: impl RecordBatchReader + Send + 'static,
902 params: Option<WriteParams>,
903 ) -> Result<()> {
904 let write_params = WriteParams {
905 mode: WriteMode::Append,
906 ..params.unwrap_or_default()
907 };
908
909 let new_dataset = InsertBuilder::new(WriteDestination::Dataset(Arc::new(self.clone())))
910 .with_params(&write_params)
911 .execute_stream(Box::new(batches) as Box<dyn RecordBatchReader + Send>)
912 .await?;
913
914 *self = new_dataset;
915
916 Ok(())
917 }
918
919 pub fn uri(&self) -> &str {
921 &self.uri
922 }
923
924 pub fn branch_location(&self) -> BranchLocation {
925 BranchLocation {
926 path: self.base.clone(),
927 uri: self.uri.clone(),
928 branch: self.manifest.branch.clone(),
929 }
930 }
931
932 pub async fn branch_identifier(&self) -> Result<BranchIdentifier> {
933 self.refs
934 .branches()
935 .get_identifier(self.manifest.branch.as_deref())
936 .await
937 }
938
939 pub fn manifest(&self) -> &Manifest {
941 &self.manifest
942 }
943
944 pub fn manifest_location(&self) -> &ManifestLocation {
945 &self.manifest_location
946 }
947
948 pub fn delta(&self) -> delta::DatasetDeltaBuilder {
963 delta::DatasetDeltaBuilder::new(self.clone())
964 }
965
966 pub(crate) fn is_legacy_storage(&self) -> bool {
968 self.manifest
969 .data_storage_format
970 .lance_file_version()
971 .unwrap()
972 == LanceFileVersion::Legacy
973 }
974
975 pub async fn latest_manifest(&self) -> Result<(Arc<Manifest>, ManifestLocation)> {
976 let location = self
977 .commit_handler
978 .resolve_latest_location(&self.base, &self.object_store)
979 .await?;
980
981 let manifest_key = ManifestKey {
983 version: location.version,
984 e_tag: location.e_tag.as_deref(),
985 };
986 let cached_manifest = self.metadata_cache.get_with_key(&manifest_key).await;
987 if let Some(cached_manifest) = cached_manifest {
988 return Ok((cached_manifest, location));
989 }
990
991 if self.already_checked_out(&location, self.manifest.branch.as_deref()) {
992 return Ok((self.manifest.clone(), self.manifest_location.clone()));
993 }
994 let mut manifest = read_manifest(&self.object_store, &location.path, location.size).await?;
995 if manifest.schema.has_dictionary_types() && manifest.should_use_legacy_format() {
996 let reader = if let Some(size) = location.size {
997 self.object_store
998 .open_with_size(&location.path, size as usize)
999 .await?
1000 } else {
1001 self.object_store.open(&location.path).await?
1002 };
1003 populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
1004 }
1005 let manifest_arc = Arc::new(manifest);
1006 self.metadata_cache
1007 .insert_with_key(&manifest_key, manifest_arc.clone())
1008 .await;
1009 Ok((manifest_arc, location))
1010 }
1011
1012 pub async fn read_transaction(&self) -> Result<Option<Transaction>> {
1017 let transaction_key = TransactionKey {
1018 version: self.manifest.version,
1019 };
1020 if let Some(transaction) = self.metadata_cache.get_with_key(&transaction_key).await {
1021 return Ok(Some((*transaction).clone()));
1022 }
1023
1024 let transaction = if let Some(pos) = self.manifest.transaction_section {
1026 let reader = if let Some(size) = self.manifest_location.size {
1027 self.object_store
1028 .open_with_size(&self.manifest_location.path, size as usize)
1029 .await?
1030 } else {
1031 self.object_store.open(&self.manifest_location.path).await?
1032 };
1033
1034 let tx: pb::Transaction = read_message(reader.as_ref(), pos).await?;
1035 Transaction::try_from(tx).map(Some)?
1036 } else if let Some(path) = &self.manifest.transaction_file {
1037 let path = self.transactions_dir().child(path.as_str());
1039 let data = self.object_store.inner.get(&path).await?.bytes().await?;
1040 let transaction = lance_table::format::pb::Transaction::decode(data)?;
1041 Transaction::try_from(transaction).map(Some)?
1042 } else {
1043 None
1044 };
1045
1046 if let Some(tx) = transaction.as_ref() {
1047 self.metadata_cache
1048 .insert_with_key(&transaction_key, Arc::new(tx.clone()))
1049 .await;
1050 }
1051 Ok(transaction)
1052 }
1053
1054 pub async fn read_transaction_by_version(&self, version: u64) -> Result<Option<Transaction>> {
1059 let dataset_version = self.checkout_version(version).await?;
1060 dataset_version.read_transaction().await
1061 }
1062
1063 pub async fn get_transactions(
1078 &self,
1079 recent_transactions: usize,
1080 ) -> Result<Vec<Option<Transaction>>> {
1081 let mut transactions = vec![];
1082 let mut dataset = self.clone();
1083
1084 loop {
1085 let transaction = dataset.read_transaction().await?;
1086 transactions.push(transaction);
1087
1088 if transactions.len() >= recent_transactions {
1089 break;
1090 } else {
1091 match dataset
1092 .checkout_version(dataset.version().version - 1)
1093 .await
1094 {
1095 Ok(ds) => dataset = ds,
1096 Err(Error::DatasetNotFound { .. }) => break,
1097 Err(err) => return Err(err),
1098 }
1099 }
1100 }
1101
1102 Ok(transactions)
1103 }
1104
1105 pub async fn restore(&mut self) -> Result<()> {
1107 let (latest_manifest, _) = self.latest_manifest().await?;
1108 let latest_version = latest_manifest.version;
1109
1110 let transaction = Transaction::new(
1111 latest_version,
1112 Operation::Restore {
1113 version: self.manifest.version,
1114 },
1115 None,
1116 );
1117
1118 self.apply_commit(transaction, &Default::default(), &Default::default())
1119 .await?;
1120
1121 Ok(())
1122 }
1123
1124 #[instrument(level = "debug", skip(self))]
1145 pub fn cleanup_old_versions(
1146 &self,
1147 older_than: Duration,
1148 delete_unverified: Option<bool>,
1149 error_if_tagged_old_versions: Option<bool>,
1150 ) -> BoxFuture<'_, Result<RemovalStats>> {
1151 let mut builder = CleanupPolicyBuilder::default();
1152 builder = builder.before_timestamp(utc_now() - older_than);
1153 if let Some(v) = delete_unverified {
1154 builder = builder.delete_unverified(v);
1155 }
1156 if let Some(v) = error_if_tagged_old_versions {
1157 builder = builder.error_if_tagged_old_versions(v);
1158 }
1159
1160 self.cleanup_with_policy(builder.build())
1161 }
1162
1163 #[instrument(level = "debug", skip(self))]
1179 pub fn cleanup_with_policy(
1180 &self,
1181 policy: CleanupPolicy,
1182 ) -> BoxFuture<'_, Result<RemovalStats>> {
1183 info!(target: TRACE_DATASET_EVENTS, event=DATASET_CLEANING_EVENT, uri=&self.uri);
1184 cleanup::cleanup_old_versions(self, policy).boxed()
1185 }
1186
1187 #[allow(clippy::too_many_arguments)]
1188 async fn do_commit(
1189 base_uri: WriteDestination<'_>,
1190 operation: Operation,
1191 read_version: Option<u64>,
1192 store_params: Option<ObjectStoreParams>,
1193 commit_handler: Option<Arc<dyn CommitHandler>>,
1194 session: Arc<Session>,
1195 enable_v2_manifest_paths: bool,
1196 detached: bool,
1197 ) -> Result<Self> {
1198 let read_version = read_version.map_or_else(
1199 || match operation {
1200 Operation::Overwrite { .. } | Operation::Restore { .. } => Ok(0),
1201 _ => Err(Error::invalid_input(
1202 "read_version must be specified for this operation",
1203 )),
1204 },
1205 Ok,
1206 )?;
1207
1208 let transaction = Transaction::new(read_version, operation, None);
1209
1210 let mut builder = CommitBuilder::new(base_uri)
1211 .enable_v2_manifest_paths(enable_v2_manifest_paths)
1212 .with_session(session)
1213 .with_detached(detached);
1214
1215 if let Some(store_params) = store_params {
1216 builder = builder.with_store_params(store_params);
1217 }
1218
1219 if let Some(commit_handler) = commit_handler {
1220 builder = builder.with_commit_handler(commit_handler);
1221 }
1222
1223 builder.execute(transaction).await
1224 }
1225
1226 pub async fn commit(
1261 dest: impl Into<WriteDestination<'_>>,
1262 operation: Operation,
1263 read_version: Option<u64>,
1264 store_params: Option<ObjectStoreParams>,
1265 commit_handler: Option<Arc<dyn CommitHandler>>,
1266 session: Arc<Session>,
1267 enable_v2_manifest_paths: bool,
1268 ) -> Result<Self> {
1269 Self::do_commit(
1270 dest.into(),
1271 operation,
1272 read_version,
1273 store_params,
1274 commit_handler,
1275 session,
1276 enable_v2_manifest_paths,
1277 false,
1278 )
1279 .await
1280 }
1281
1282 pub async fn commit_detached(
1291 dest: impl Into<WriteDestination<'_>>,
1292 operation: Operation,
1293 read_version: Option<u64>,
1294 store_params: Option<ObjectStoreParams>,
1295 commit_handler: Option<Arc<dyn CommitHandler>>,
1296 session: Arc<Session>,
1297 enable_v2_manifest_paths: bool,
1298 ) -> Result<Self> {
1299 Self::do_commit(
1300 dest.into(),
1301 operation,
1302 read_version,
1303 store_params,
1304 commit_handler,
1305 session,
1306 enable_v2_manifest_paths,
1307 true,
1308 )
1309 .await
1310 }
1311
1312 pub(crate) async fn apply_commit(
1313 &mut self,
1314 transaction: Transaction,
1315 write_config: &ManifestWriteConfig,
1316 commit_config: &CommitConfig,
1317 ) -> Result<()> {
1318 let (manifest, manifest_location) = commit_transaction(
1319 self,
1320 self.object_store(),
1321 self.commit_handler.as_ref(),
1322 &transaction,
1323 write_config,
1324 commit_config,
1325 self.manifest_location.naming_scheme,
1326 None,
1327 )
1328 .await?;
1329
1330 self.manifest = Arc::new(manifest);
1331 self.manifest_location = manifest_location;
1332 self.fragment_bitmap = Arc::new(
1333 self.manifest
1334 .fragments
1335 .iter()
1336 .map(|f| f.id as u32)
1337 .collect(),
1338 );
1339
1340 Ok(())
1341 }
1342
1343 pub fn scan(&self) -> Scanner {
1345 Scanner::new(Arc::new(self.clone()))
1346 }
1347
1348 #[instrument(skip_all)]
1352 pub async fn count_rows(&self, filter: Option<String>) -> Result<usize> {
1353 if let Some(filter) = filter {
1355 let mut scanner = self.scan();
1356 scanner.filter(&filter)?;
1357 Ok(scanner
1358 .project::<String>(&[])?
1359 .with_row_id() .count_rows()
1361 .await? as usize)
1362 } else {
1363 self.count_all_rows().await
1364 }
1365 }
1366
1367 pub(crate) async fn count_all_rows(&self) -> Result<usize> {
1368 let cnts = stream::iter(self.get_fragments())
1369 .map(|f| async move { f.count_rows(None).await })
1370 .buffer_unordered(16)
1371 .try_collect::<Vec<_>>()
1372 .await?;
1373 Ok(cnts.iter().sum())
1374 }
1375
1376 #[instrument(skip_all, fields(num_rows=row_indices.len()))]
1378 pub async fn take(
1379 &self,
1380 row_indices: &[u64],
1381 projection: impl Into<ProjectionRequest>,
1382 ) -> Result<RecordBatch> {
1383 take::take(self, row_indices, projection.into()).await
1384 }
1385
1386 pub async fn take_rows(
1421 &self,
1422 row_ids: &[u64],
1423 projection: impl Into<ProjectionRequest>,
1424 ) -> Result<RecordBatch> {
1425 Arc::new(self.clone())
1426 .take_builder(row_ids, projection)?
1427 .execute()
1428 .await
1429 }
1430
1431 pub fn take_builder(
1432 self: &Arc<Self>,
1433 row_ids: &[u64],
1434 projection: impl Into<ProjectionRequest>,
1435 ) -> Result<TakeBuilder> {
1436 TakeBuilder::try_new_from_ids(self.clone(), row_ids.to_vec(), projection.into())
1437 }
1438
1439 pub async fn take_blobs(
1441 self: &Arc<Self>,
1442 row_ids: &[u64],
1443 column: impl AsRef<str>,
1444 ) -> Result<Vec<BlobFile>> {
1445 blob::take_blobs(self, row_ids, column.as_ref()).await
1446 }
1447
1448 pub async fn take_blobs_by_addresses(
1456 self: &Arc<Self>,
1457 row_addrs: &[u64],
1458 column: impl AsRef<str>,
1459 ) -> Result<Vec<BlobFile>> {
1460 blob::take_blobs_by_addresses(self, row_addrs, column.as_ref()).await
1461 }
1462
1463 pub async fn take_blobs_by_indices(
1465 self: &Arc<Self>,
1466 row_indices: &[u64],
1467 column: impl AsRef<str>,
1468 ) -> Result<Vec<BlobFile>> {
1469 let row_addrs = row_offsets_to_row_addresses(self, row_indices).await?;
1470 blob::take_blobs_by_addresses(self, &row_addrs, column.as_ref()).await
1471 }
1472
1473 pub fn take_scan(
1477 &self,
1478 row_ranges: Pin<Box<dyn Stream<Item = Result<Range<u64>>> + Send>>,
1479 projection: Arc<Schema>,
1480 batch_readahead: usize,
1481 ) -> DatasetRecordBatchStream {
1482 take::take_scan(self, row_ranges, projection, batch_readahead)
1483 }
1484
1485 pub async fn sample(&self, n: usize, projection: &Schema) -> Result<RecordBatch> {
1490 use rand::seq::IteratorRandom;
1491 let num_rows = self.count_rows(None).await?;
1492 let mut ids = (0..num_rows as u64).choose_multiple(&mut rand::rng(), n);
1493 ids.sort_unstable();
1494 self.take(&ids, projection.clone()).await
1495 }
1496
1497 pub async fn delete(&mut self, predicate: &str) -> Result<write::delete::DeleteResult> {
1499 info!(target: TRACE_DATASET_EVENTS, event=DATASET_DELETING_EVENT, uri = &self.uri, predicate=predicate);
1500 write::delete::delete(self, predicate).await
1501 }
1502
1503 pub async fn truncate_table(&mut self) -> Result<()> {
1505 self.delete("true").await.map(|_| ())
1506 }
1507
1508 pub async fn add_bases(
1524 self: &Arc<Self>,
1525 new_bases: Vec<lance_table::format::BasePath>,
1526 transaction_properties: Option<HashMap<String, String>>,
1527 ) -> Result<Self> {
1528 let operation = Operation::UpdateBases { new_bases };
1529
1530 let transaction = TransactionBuilder::new(self.manifest.version, operation)
1531 .transaction_properties(transaction_properties.map(Arc::new))
1532 .build();
1533
1534 let new_dataset = CommitBuilder::new(self.clone())
1535 .execute(transaction)
1536 .await?;
1537
1538 Ok(new_dataset)
1539 }
1540
1541 pub async fn count_deleted_rows(&self) -> Result<usize> {
1542 futures::stream::iter(self.get_fragments())
1543 .map(|f| async move { f.count_deletions().await })
1544 .buffer_unordered(self.object_store.io_parallelism())
1545 .try_fold(0, |acc, x| futures::future::ready(Ok(acc + x)))
1546 .await
1547 }
1548
1549 pub fn object_store(&self) -> &ObjectStore {
1550 &self.object_store
1551 }
1552
1553 pub fn with_object_store(
1559 &self,
1560 object_store: Arc<ObjectStore>,
1561 store_params: Option<ObjectStoreParams>,
1562 ) -> Self {
1563 let mut cloned = self.clone();
1564 cloned.object_store = object_store;
1565 if let Some(store_params) = store_params {
1566 cloned.store_params = Some(Box::new(store_params));
1567 }
1568 cloned
1569 }
1570
1571 #[deprecated(since = "0.25.0", note = "Use initial_storage_options() instead")]
1576 pub fn storage_options(&self) -> Option<&HashMap<String, String>> {
1577 self.initial_storage_options()
1578 }
1579
1580 pub fn initial_storage_options(&self) -> Option<&HashMap<String, String>> {
1584 self.store_params
1585 .as_ref()
1586 .and_then(|params| params.storage_options())
1587 }
1588
1589 pub fn storage_options_provider(
1591 &self,
1592 ) -> Option<Arc<dyn lance_io::object_store::StorageOptionsProvider>> {
1593 self.store_params
1594 .as_ref()
1595 .and_then(|params| params.storage_options_accessor.as_ref())
1596 .and_then(|accessor| accessor.provider().cloned())
1597 }
1598
1599 pub fn storage_options_accessor(&self) -> Option<Arc<StorageOptionsAccessor>> {
1605 self.store_params
1606 .as_ref()
1607 .and_then(|params| params.get_accessor())
1608 }
1609
1610 pub async fn latest_storage_options(&self) -> Result<Option<StorageOptions>> {
1623 if let Some(accessor) = self.storage_options_accessor() {
1625 let options = accessor.get_storage_options().await?;
1626 return Ok(Some(options));
1627 }
1628
1629 Ok(self.initial_storage_options().cloned().map(StorageOptions))
1631 }
1632
1633 pub fn data_dir(&self) -> Path {
1634 self.base.child(DATA_DIR)
1635 }
1636
1637 pub fn indices_dir(&self) -> Path {
1638 self.base.child(INDICES_DIR)
1639 }
1640
1641 pub fn transactions_dir(&self) -> Path {
1642 self.base.child(TRANSACTIONS_DIR)
1643 }
1644
1645 pub fn deletions_dir(&self) -> Path {
1646 self.base.child(DELETIONS_DIR)
1647 }
1648
1649 pub fn versions_dir(&self) -> Path {
1650 self.base.child(VERSIONS_DIR)
1651 }
1652
1653 pub(crate) fn data_file_dir(&self, data_file: &DataFile) -> Result<Path> {
1654 match data_file.base_id.as_ref() {
1655 Some(base_id) => {
1656 let base_paths = &self.manifest.base_paths;
1657 let base_path = base_paths.get(base_id).ok_or_else(|| {
1658 Error::invalid_input(format!(
1659 "base_path id {} not found for data_file {}",
1660 base_id, data_file.path
1661 ))
1662 })?;
1663 let path = base_path.extract_path(self.session.store_registry())?;
1664 if base_path.is_dataset_root {
1665 Ok(path.child(DATA_DIR))
1666 } else {
1667 Ok(path)
1668 }
1669 }
1670 None => Ok(self.base.child(DATA_DIR)),
1671 }
1672 }
1673
1674 pub(crate) async fn object_store_for_base(&self, base_id: u32) -> Result<Arc<ObjectStore>> {
1676 let base_path = self.manifest.base_paths.get(&base_id).ok_or_else(|| {
1677 Error::invalid_input(format!("Dataset base path with ID {} not found", base_id))
1678 })?;
1679
1680 let (store, _) = ObjectStore::from_uri_and_params(
1681 self.session.store_registry(),
1682 &base_path.path,
1683 &self.store_params.as_deref().cloned().unwrap_or_default(),
1684 )
1685 .await?;
1686
1687 Ok(store)
1688 }
1689
1690 pub(crate) fn dataset_dir_for_deletion(&self, deletion_file: &DeletionFile) -> Result<Path> {
1691 match deletion_file.base_id.as_ref() {
1692 Some(base_id) => {
1693 let base_paths = &self.manifest.base_paths;
1694 let base_path = base_paths.get(base_id).ok_or_else(|| {
1695 Error::invalid_input(format!(
1696 "base_path id {} not found for deletion_file {:?}",
1697 base_id, deletion_file
1698 ))
1699 })?;
1700
1701 if !base_path.is_dataset_root {
1702 return Err(Error::internal(format!(
1703 "base_path id {} is not a dataset root for deletion_file {:?}",
1704 base_id, deletion_file
1705 )));
1706 }
1707 base_path.extract_path(self.session.store_registry())
1708 }
1709 None => Ok(self.base.clone()),
1710 }
1711 }
1712
1713 pub(crate) fn indice_files_dir(&self, index: &IndexMetadata) -> Result<Path> {
1715 match index.base_id.as_ref() {
1716 Some(base_id) => {
1717 let base_paths = &self.manifest.base_paths;
1718 let base_path = base_paths.get(base_id).ok_or_else(|| {
1719 Error::invalid_input(format!(
1720 "base_path id {} not found for index {}",
1721 base_id, index.uuid
1722 ))
1723 })?;
1724 let path = base_path.extract_path(self.session.store_registry())?;
1725 if base_path.is_dataset_root {
1726 Ok(path.child(INDICES_DIR))
1727 } else {
1728 Ok(path)
1730 }
1731 }
1732 None => Ok(self.base.child(INDICES_DIR)),
1733 }
1734 }
1735
1736 pub fn session(&self) -> Arc<Session> {
1737 self.session.clone()
1738 }
1739
1740 pub fn version(&self) -> Version {
1741 Version::from(self.manifest.as_ref())
1742 }
1743
1744 pub async fn index_cache_entry_count(&self) -> usize {
1746 self.session.index_cache.size().await
1747 }
1748
1749 pub async fn index_cache_hit_rate(&self) -> f32 {
1751 let stats = self.session.index_cache_stats().await;
1752 stats.hit_ratio()
1753 }
1754
1755 pub fn cache_size_bytes(&self) -> u64 {
1756 self.session.deep_size_of() as u64
1757 }
1758
1759 pub async fn versions(&self) -> Result<Vec<Version>> {
1761 let mut versions: Vec<Version> = self
1762 .commit_handler
1763 .list_manifest_locations(&self.base, &self.object_store, false)
1764 .try_filter_map(|location| async move {
1765 match read_manifest(&self.object_store, &location.path, location.size).await {
1766 Ok(manifest) => Ok(Some(Version::from(&manifest))),
1767 Err(e) => Err(e),
1768 }
1769 })
1770 .try_collect()
1771 .await?;
1772
1773 versions.sort_by_key(|v| v.version);
1775
1776 Ok(versions)
1777 }
1778
1779 pub async fn latest_version_id(&self) -> Result<u64> {
1783 Ok(self
1784 .commit_handler
1785 .resolve_latest_location(&self.base, &self.object_store)
1786 .await?
1787 .version)
1788 }
1789
1790 pub fn count_fragments(&self) -> usize {
1791 self.manifest.fragments.len()
1792 }
1793
1794 pub fn schema(&self) -> &Schema {
1796 &self.manifest.schema
1797 }
1798
1799 pub fn empty_projection(self: &Arc<Self>) -> Projection {
1802 Projection::empty(self.clone())
1803 }
1804
1805 pub fn full_projection(self: &Arc<Self>) -> Projection {
1807 Projection::full(self.clone())
1808 }
1809
1810 pub fn get_fragments(&self) -> Vec<FileFragment> {
1812 let dataset = Arc::new(self.clone());
1813 self.manifest
1814 .fragments
1815 .iter()
1816 .map(|f| FileFragment::new(dataset.clone(), f.clone()))
1817 .collect()
1818 }
1819
1820 pub fn get_fragment(&self, fragment_id: usize) -> Option<FileFragment> {
1821 let dataset = Arc::new(self.clone());
1822 let fragment = self
1823 .manifest
1824 .fragments
1825 .iter()
1826 .find(|f| f.id == fragment_id as u64)?;
1827 Some(FileFragment::new(dataset, fragment.clone()))
1828 }
1829
1830 pub fn fragments(&self) -> &Arc<Vec<Fragment>> {
1831 &self.manifest.fragments
1832 }
1833
1834 pub fn get_frags_from_ordered_ids(&self, ordered_ids: &[u32]) -> Vec<Option<FileFragment>> {
1837 let mut fragments = Vec::with_capacity(ordered_ids.len());
1838 let mut id_iter = ordered_ids.iter();
1839 let mut id = id_iter.next();
1840 let mut last_id: i64 = -1;
1842 for frag in self.manifest.fragments.iter() {
1843 let mut the_id = if let Some(id) = id { *id } else { break };
1844 assert!(the_id as i64 > last_id);
1846 while the_id < frag.id as u32 {
1849 fragments.push(None);
1850 last_id = the_id as i64;
1851 id = id_iter.next();
1852 the_id = if let Some(id) = id { *id } else { break };
1853 }
1854
1855 if the_id == frag.id as u32 {
1856 fragments.push(Some(FileFragment::new(
1857 Arc::new(self.clone()),
1858 frag.clone(),
1859 )));
1860 last_id = the_id as i64;
1861 id = id_iter.next();
1862 }
1863 }
1864 fragments
1865 }
1866
1867 async fn filter_addr_or_ids(&self, addr_or_ids: &[u64], addrs: &[u64]) -> Result<Vec<u64>> {
1869 if addrs.is_empty() {
1870 return Ok(Vec::new());
1871 }
1872
1873 let mut perm = permutation::sort(addrs);
1874 let sorted_addrs = perm.apply_slice(addrs);
1877
1878 let referenced_frag_ids = sorted_addrs
1880 .iter()
1881 .map(|addr| RowAddress::from(*addr).fragment_id())
1882 .dedup()
1883 .collect::<Vec<_>>();
1884 let frags = self.get_frags_from_ordered_ids(&referenced_frag_ids);
1885 let dv_futs = frags
1886 .iter()
1887 .map(|frag| {
1888 if let Some(frag) = frag {
1889 frag.get_deletion_vector().boxed()
1890 } else {
1891 std::future::ready(Ok(None)).boxed()
1892 }
1893 })
1894 .collect::<Vec<_>>();
1895 let dvs = stream::iter(dv_futs)
1896 .buffered(self.object_store.io_parallelism())
1897 .try_collect::<Vec<_>>()
1898 .await?;
1899
1900 let mut filtered_sorted_addrs = Vec::with_capacity(sorted_addrs.len());
1903 let mut sorted_addr_iter = sorted_addrs.into_iter().map(RowAddress::from);
1904 let mut next_addr = sorted_addr_iter.next().unwrap();
1905 let mut exhausted = false;
1906
1907 for frag_dv in frags.iter().zip(dvs).zip(referenced_frag_ids.iter()) {
1908 let ((frag, dv), frag_id) = frag_dv;
1909 if frag.is_some() {
1910 if let Some(dv) = dv.as_ref() {
1912 for deleted in dv.to_sorted_iter() {
1914 while next_addr.fragment_id() == *frag_id
1915 && next_addr.row_offset() < deleted
1916 {
1917 filtered_sorted_addrs.push(Some(u64::from(next_addr)));
1918 if let Some(next) = sorted_addr_iter.next() {
1919 next_addr = next;
1920 } else {
1921 exhausted = true;
1922 break;
1923 }
1924 }
1925 if exhausted {
1926 break;
1927 }
1928 if next_addr.fragment_id() != *frag_id {
1929 break;
1930 }
1931 if next_addr.row_offset() == deleted {
1932 filtered_sorted_addrs.push(None);
1933 if let Some(next) = sorted_addr_iter.next() {
1934 next_addr = next;
1935 } else {
1936 exhausted = true;
1937 break;
1938 }
1939 }
1940 }
1941 }
1942 if exhausted {
1943 break;
1944 }
1945 while next_addr.fragment_id() == *frag_id {
1948 filtered_sorted_addrs.push(Some(u64::from(next_addr)));
1949 if let Some(next) = sorted_addr_iter.next() {
1950 next_addr = next;
1951 } else {
1952 break;
1953 }
1954 }
1955 } else {
1956 while next_addr.fragment_id() == *frag_id {
1958 filtered_sorted_addrs.push(None);
1959 if let Some(next) = sorted_addr_iter.next() {
1960 next_addr = next;
1961 } else {
1962 break;
1963 }
1964 }
1965 }
1966 }
1967
1968 perm.apply_inv_slice_in_place(&mut filtered_sorted_addrs);
1972 Ok(addr_or_ids
1973 .iter()
1974 .zip(filtered_sorted_addrs)
1975 .filter_map(|(addr_or_id, maybe_addr)| maybe_addr.map(|_| *addr_or_id))
1976 .collect())
1977 }
1978
1979 pub(crate) async fn filter_deleted_ids(&self, ids: &[u64]) -> Result<Vec<u64>> {
1980 let addresses = if let Some(row_id_index) = get_row_id_index(self).await? {
1981 let addresses = ids
1982 .iter()
1983 .filter_map(|id| row_id_index.get(*id).map(|address| address.into()))
1984 .collect::<Vec<_>>();
1985 Cow::Owned(addresses)
1986 } else {
1987 Cow::Borrowed(ids)
1988 };
1989
1990 self.filter_addr_or_ids(ids, &addresses).await
1991 }
1992
1993 pub async fn num_small_files(&self, max_rows_per_group: usize) -> usize {
1998 futures::stream::iter(self.get_fragments())
1999 .map(|f| async move { f.physical_rows().await })
2000 .buffered(self.object_store.io_parallelism())
2001 .try_filter(|row_count| futures::future::ready(*row_count < max_rows_per_group))
2002 .count()
2003 .await
2004 }
2005
2006 pub async fn validate(&self) -> Result<()> {
2007 let id_counts =
2009 self.manifest
2010 .fragments
2011 .iter()
2012 .map(|f| f.id)
2013 .fold(HashMap::new(), |mut acc, id| {
2014 *acc.entry(id).or_insert(0) += 1;
2015 acc
2016 });
2017 for (id, count) in id_counts {
2018 if count > 1 {
2019 return Err(Error::corrupt_file(
2020 self.base.clone(),
2021 format!(
2022 "Duplicate fragment id {} found in dataset {:?}",
2023 id, self.base
2024 ),
2025 ));
2026 }
2027 }
2028
2029 self.manifest
2031 .fragments
2032 .iter()
2033 .map(|f| f.id)
2034 .try_fold(0, |prev, id| {
2035 if id < prev {
2036 Err(Error::corrupt_file(self.base.clone(), format!(
2037 "Fragment ids are not sorted in increasing fragment-id order. Found {} after {} in dataset {:?}",
2038 id, prev, self.base
2039 )))
2040 } else {
2041 Ok(id)
2042 }
2043 })?;
2044
2045 futures::stream::iter(self.get_fragments())
2047 .map(|f| async move { f.validate().await })
2048 .buffer_unordered(self.object_store.io_parallelism())
2049 .try_collect::<Vec<()>>()
2050 .await?;
2051
2052 let indices = self.load_indices().await?;
2054 self.validate_indices(&indices)?;
2055
2056 Ok(())
2057 }
2058
2059 fn validate_indices(&self, indices: &[IndexMetadata]) -> Result<()> {
2060 let mut index_ids = HashSet::new();
2062 for index in indices.iter() {
2063 if !index_ids.insert(&index.uuid) {
2064 return Err(Error::corrupt_file(
2065 self.manifest_location.path.clone(),
2066 format!(
2067 "Duplicate index id {} found in dataset {:?}",
2068 &index.uuid, self.base
2069 ),
2070 ));
2071 }
2072 }
2073
2074 if let Err(err) = detect_overlapping_fragments(indices) {
2076 let mut message = "Overlapping fragments detected in dataset.".to_string();
2077 for (index_name, overlapping_frags) in err.bad_indices {
2078 message.push_str(&format!(
2079 "\nIndex {:?} has overlapping fragments: {:?}",
2080 index_name, overlapping_frags
2081 ));
2082 }
2083 return Err(Error::corrupt_file(
2084 self.manifest_location.path.clone(),
2085 message,
2086 ));
2087 };
2088
2089 Ok(())
2090 }
2091
2092 pub async fn migrate_manifest_paths_v2(&mut self) -> Result<()> {
2127 migrate_scheme_to_v2(self.object_store(), &self.base).await?;
2128 let latest_version = self.latest_version_id().await?;
2130 *self = self.checkout_version(latest_version).await?;
2131 Ok(())
2132 }
2133
2134 pub async fn shallow_clone(
2139 &mut self,
2140 target_path: &str,
2141 version: impl Into<refs::Ref>,
2142 store_params: Option<ObjectStoreParams>,
2143 ) -> Result<Self> {
2144 let (ref_name, version_number) = self.resolve_reference(version.into()).await?;
2145 let clone_op = Operation::Clone {
2146 is_shallow: true,
2147 ref_name,
2148 ref_version: version_number,
2149 ref_path: self.uri.clone(),
2150 branch_name: None,
2151 };
2152 let transaction = Transaction::new(version_number, clone_op, None);
2153
2154 let builder = CommitBuilder::new(WriteDestination::Uri(target_path))
2155 .with_store_params(
2156 store_params.unwrap_or(self.store_params.as_deref().cloned().unwrap_or_default()),
2157 )
2158 .with_object_store(Arc::new(self.object_store().clone()))
2159 .with_commit_handler(self.commit_handler.clone())
2160 .with_storage_format(self.manifest.data_storage_format.lance_file_version()?);
2161 builder.execute(transaction).await
2162 }
2163
2164 pub async fn deep_clone(
2174 &mut self,
2175 target_path: &str,
2176 version: impl Into<refs::Ref>,
2177 store_params: Option<ObjectStoreParams>,
2178 ) -> Result<Self> {
2179 use futures::StreamExt;
2180
2181 let src_ds = self.checkout_version(version).await?;
2183 let src_paths = src_ds.collect_paths().await?;
2184
2185 let (target_store, target_base) = ObjectStore::from_uri_and_params(
2187 self.session.store_registry(),
2188 target_path,
2189 &store_params.clone().unwrap_or_default(),
2190 )
2191 .await?;
2192
2193 if self
2195 .commit_handler
2196 .resolve_latest_location(&target_base, &target_store)
2197 .await
2198 .is_ok()
2199 {
2200 return Err(Error::dataset_already_exists(target_path.to_string()));
2201 }
2202
2203 let build_absolute_path = |relative_path: &str, base: &Path| -> Path {
2204 let mut path = base.clone();
2205 for seg in relative_path.split('/') {
2206 if !seg.is_empty() {
2207 path = path.child(seg);
2208 }
2209 }
2210 path
2211 };
2212
2213 let io_parallelism = self.object_store.io_parallelism();
2220 let copy_futures = src_paths
2221 .iter()
2222 .map(|(relative_path, base)| {
2223 let store = Arc::clone(&target_store);
2224 let src_path = build_absolute_path(relative_path, base);
2225 let target_path = build_absolute_path(relative_path, &target_base);
2226 async move { store.copy(&src_path, &target_path).await.map(|_| ()) }
2227 })
2228 .collect::<Vec<_>>();
2229
2230 futures::stream::iter(copy_futures)
2231 .buffer_unordered(io_parallelism)
2232 .collect::<Vec<_>>()
2233 .await
2234 .into_iter()
2235 .collect::<Result<Vec<_>>>()?;
2236
2237 let ref_name = src_ds.manifest.branch.clone();
2239 let ref_version = src_ds.manifest_location.version;
2240 let clone_op = Operation::Clone {
2241 is_shallow: false,
2242 ref_name,
2243 ref_version,
2244 ref_path: src_ds.uri().to_string(),
2245 branch_name: None,
2246 };
2247 let txn = Transaction::new(ref_version, clone_op, None);
2248 let builder = CommitBuilder::new(WriteDestination::Uri(target_path))
2249 .with_store_params(store_params.clone().unwrap_or_default())
2250 .with_object_store(target_store.clone())
2251 .with_commit_handler(self.commit_handler.clone())
2252 .with_storage_format(self.manifest.data_storage_format.lance_file_version()?);
2253 let new_ds = builder.execute(txn).await?;
2254 Ok(new_ds)
2255 }
2256
2257 async fn resolve_reference(&self, reference: refs::Ref) -> Result<(Option<String>, u64)> {
2258 match reference {
2259 refs::Ref::Version(branch, version_number) => {
2260 if let Some(version_number) = version_number {
2261 Ok((branch, version_number))
2262 } else {
2263 let branch_location = self.branch_location().find_branch(branch.as_deref())?;
2264 let version_number = self
2265 .commit_handler
2266 .resolve_latest_location(&branch_location.path, &self.object_store)
2267 .await?
2268 .version;
2269 Ok((branch, version_number))
2270 }
2271 }
2272 refs::Ref::VersionNumber(version_number) => {
2273 Ok((self.manifest.branch.clone(), version_number))
2274 }
2275 refs::Ref::Tag(tag_name) => {
2276 let tag_contents = self.tags().get(tag_name.as_str()).await?;
2277 Ok((tag_contents.branch, tag_contents.version))
2278 }
2279 }
2280 }
2281
2282 async fn collect_paths(&self) -> Result<Vec<(String, Path)>> {
2284 let mut file_paths: Vec<(String, Path)> = Vec::new();
2285 for fragment in self.manifest.fragments.iter() {
2286 if let Some(RowIdMeta::External(external_file)) = &fragment.row_id_meta {
2287 return Err(Error::internal(format!(
2288 "External row_id_meta is not supported yet. external file path: {}",
2289 external_file.path
2290 )));
2291 }
2292 for data_file in fragment.files.iter() {
2293 let base_root = if let Some(base_id) = data_file.base_id {
2294 let base_path =
2295 self.manifest.base_paths.get(&base_id).ok_or_else(|| {
2296 Error::internal(format!("base_id {} not found", base_id))
2297 })?;
2298 Path::parse(base_path.path.as_str())?
2299 } else {
2300 self.base.clone()
2301 };
2302 file_paths.push((
2303 format!("{}/{}", DATA_DIR, data_file.path.clone()),
2304 base_root,
2305 ));
2306 }
2307 if let Some(deletion_file) = &fragment.deletion_file {
2308 let base_root = if let Some(base_id) = deletion_file.base_id {
2309 let base_path =
2310 self.manifest.base_paths.get(&base_id).ok_or_else(|| {
2311 Error::internal(format!("base_id {} not found", base_id))
2312 })?;
2313 Path::parse(base_path.path.as_str())?
2314 } else {
2315 self.base.clone()
2316 };
2317 file_paths.push((
2318 relative_deletion_file_path(fragment.id, deletion_file),
2319 base_root,
2320 ));
2321 }
2322 }
2323
2324 let indices = read_manifest_indexes(
2325 self.object_store.as_ref(),
2326 &self.manifest_location,
2327 &self.manifest,
2328 )
2329 .await?;
2330
2331 for index in &indices {
2332 let base_root = if let Some(base_id) = index.base_id {
2333 let base_path = self
2334 .manifest
2335 .base_paths
2336 .get(&base_id)
2337 .ok_or_else(|| Error::internal(format!("base_id {} not found", base_id)))?;
2338 Path::parse(base_path.path.as_str())?
2339 } else {
2340 self.base.clone()
2341 };
2342 let index_root = base_root.child(INDICES_DIR).child(index.uuid.to_string());
2343 let mut stream = self.object_store.read_dir_all(&index_root, None);
2344 while let Some(meta) = stream.next().await.transpose()? {
2345 if let Some(filename) = meta.location.filename() {
2346 file_paths.push((
2347 format!("{}/{}/{}", INDICES_DIR, index.uuid, filename),
2348 base_root.clone(),
2349 ));
2350 }
2351 }
2352 }
2353 Ok(file_paths)
2354 }
2355
2356 pub fn sql(&self, sql: &str) -> SqlQueryBuilder {
2360 SqlQueryBuilder::new(self.clone(), sql)
2361 }
2362
2363 pub(crate) fn lance_supports_nulls(&self, datatype: &DataType) -> bool {
2365 match self
2366 .manifest()
2367 .data_storage_format
2368 .lance_file_version()
2369 .unwrap_or(LanceFileVersion::Legacy)
2370 .resolve()
2371 {
2372 LanceFileVersion::Legacy => matches!(
2373 datatype,
2374 DataType::Utf8
2375 | DataType::LargeUtf8
2376 | DataType::Binary
2377 | DataType::List(_)
2378 | DataType::FixedSizeBinary(_)
2379 | DataType::FixedSizeList(_, _)
2380 ),
2381 LanceFileVersion::V2_0 => !matches!(datatype, DataType::Struct(..)),
2382 _ => true,
2383 }
2384 }
2385}
2386
2387pub(crate) struct NewTransactionResult<'a> {
2388 pub dataset: BoxFuture<'a, Result<Dataset>>,
2389 pub new_transactions: BoxStream<'a, Result<(u64, Arc<Transaction>)>>,
2390}
2391
2392pub(crate) fn load_new_transactions(dataset: &Dataset) -> NewTransactionResult<'_> {
2393 let io_parallelism = dataset.object_store().io_parallelism();
2396 let latest_version = dataset.manifest.version;
2397 let locations = dataset
2398 .commit_handler
2399 .list_manifest_locations(&dataset.base, dataset.object_store(), true)
2400 .try_take_while(move |location| {
2401 futures::future::ready(Ok(location.version > latest_version))
2402 });
2403
2404 let (latest_tx, latest_rx) = tokio::sync::oneshot::channel();
2406 let mut latest_tx = Some(latest_tx);
2407
2408 let manifests = locations
2409 .map_ok(move |location| {
2410 let latest_tx = latest_tx.take();
2411 async move {
2412 let manifest_key = ManifestKey {
2413 version: location.version,
2414 e_tag: location.e_tag.as_deref(),
2415 };
2416 let manifest = if let Some(cached) =
2417 dataset.metadata_cache.get_with_key(&manifest_key).await
2418 {
2419 cached
2420 } else {
2421 let loaded = Arc::new(
2422 Dataset::load_manifest(
2423 dataset.object_store(),
2424 &location,
2425 &dataset.uri,
2426 dataset.session.as_ref(),
2427 )
2428 .await?,
2429 );
2430 dataset
2431 .metadata_cache
2432 .insert_with_key(&manifest_key, loaded.clone())
2433 .await;
2434 loaded
2435 };
2436
2437 if let Some(latest_tx) = latest_tx {
2438 let _ = latest_tx.send((manifest.clone(), location.clone()));
2440 }
2441
2442 Ok((manifest, location))
2443 }
2444 })
2445 .try_buffer_unordered(io_parallelism / 2);
2446 let transactions = manifests
2447 .map_ok(move |(manifest, location)| async move {
2448 let manifest_copy = manifest.clone();
2449 let tx_key = TransactionKey {
2450 version: manifest.version,
2451 };
2452 let transaction =
2453 if let Some(cached) = dataset.metadata_cache.get_with_key(&tx_key).await {
2454 cached
2455 } else {
2456 let dataset_version = Dataset::checkout_manifest(
2457 dataset.object_store.clone(),
2458 dataset.base.clone(),
2459 dataset.uri.clone(),
2460 manifest_copy.clone(),
2461 location,
2462 dataset.session(),
2463 dataset.commit_handler.clone(),
2464 dataset.file_reader_options.clone(),
2465 dataset.store_params.as_deref().cloned(),
2466 )?;
2467 let loaded =
2468 Arc::new(dataset_version.read_transaction().await?.ok_or_else(|| {
2469 Error::internal(format!(
2470 "Dataset version {} does not have a transaction file",
2471 manifest_copy.version
2472 ))
2473 })?);
2474 dataset
2475 .metadata_cache
2476 .insert_with_key(&tx_key, loaded.clone())
2477 .await;
2478 loaded
2479 };
2480 Ok((manifest.version, transaction))
2481 })
2482 .try_buffer_unordered(io_parallelism / 2);
2483
2484 let dataset = async move {
2485 if let Ok((latest_manifest, location)) = latest_rx.await {
2486 Dataset::checkout_manifest(
2488 dataset.object_store.clone(),
2489 dataset.base.clone(),
2490 dataset.uri.clone(),
2491 latest_manifest,
2492 location,
2493 dataset.session(),
2494 dataset.commit_handler.clone(),
2495 dataset.file_reader_options.clone(),
2496 dataset.store_params.as_deref().cloned(),
2497 )
2498 } else {
2499 Ok(dataset.clone())
2502 }
2503 }
2504 .boxed();
2505
2506 let new_transactions = transactions.boxed();
2507
2508 NewTransactionResult {
2509 dataset,
2510 new_transactions,
2511 }
2512}
2513
2514impl Dataset {
2531 pub async fn add_columns(
2533 &mut self,
2534 transforms: NewColumnTransform,
2535 read_columns: Option<Vec<String>>,
2536 batch_size: Option<u32>,
2537 ) -> Result<()> {
2538 schema_evolution::add_columns(self, transforms, read_columns, batch_size).await
2539 }
2540
2541 pub async fn alter_columns(&mut self, alterations: &[ColumnAlteration]) -> Result<()> {
2550 schema_evolution::alter_columns(self, alterations).await
2551 }
2552
2553 pub async fn drop_columns(&mut self, columns: &[&str]) -> Result<()> {
2560 info!(target: TRACE_DATASET_EVENTS, event=DATASET_DROPPING_COLUMN_EVENT, uri = &self.uri, columns = columns.join(","));
2561 schema_evolution::drop_columns(self, columns).await
2562 }
2563
2564 #[deprecated(since = "0.9.12", note = "Please use `drop_columns` instead.")]
2570 pub async fn drop(&mut self, columns: &[&str]) -> Result<()> {
2571 self.drop_columns(columns).await
2572 }
2573
2574 async fn merge_impl(
2575 &mut self,
2576 stream: Box<dyn RecordBatchReader + Send>,
2577 left_on: &str,
2578 right_on: &str,
2579 ) -> Result<()> {
2580 if self.schema().field(left_on).is_none() && left_on != ROW_ID && left_on != ROW_ADDR {
2582 return Err(Error::invalid_input(format!(
2583 "Column {} does not exist in the left side dataset",
2584 left_on
2585 )));
2586 };
2587 let right_schema = stream.schema();
2588 if right_schema.field_with_name(right_on).is_err() {
2589 return Err(Error::invalid_input(format!(
2590 "Column {} does not exist in the right side dataset",
2591 right_on
2592 )));
2593 };
2594 for field in right_schema.fields() {
2595 if field.name() == right_on {
2596 continue;
2599 }
2600 if self.schema().field(field.name()).is_some() {
2601 return Err(Error::invalid_input(format!(
2602 "Column {} exists in both sides of the dataset",
2603 field.name()
2604 )));
2605 }
2606 }
2607
2608 let joiner = Arc::new(HashJoiner::try_new(stream, right_on).await?);
2610 let mut new_schema: Schema = self.schema().merge(joiner.out_schema().as_ref())?;
2613 new_schema.set_field_id(Some(self.manifest.max_field_id()));
2614
2615 let updated_fragments: Vec<Fragment> = stream::iter(self.get_fragments())
2618 .then(|f| {
2619 let joiner = joiner.clone();
2620 async move { f.merge(left_on, &joiner).await.map(|f| f.metadata) }
2621 })
2622 .try_collect::<Vec<_>>()
2623 .await?;
2624
2625 let transaction = Transaction::new(
2626 self.manifest.version,
2627 Operation::Merge {
2628 fragments: updated_fragments,
2629 schema: new_schema,
2630 },
2631 None,
2632 );
2633
2634 self.apply_commit(transaction, &Default::default(), &Default::default())
2635 .await?;
2636
2637 Ok(())
2638 }
2639
2640 pub async fn merge(
2652 &mut self,
2653 stream: impl RecordBatchReader + Send + 'static,
2654 left_on: &str,
2655 right_on: &str,
2656 ) -> Result<()> {
2657 let stream = Box::new(stream);
2658 self.merge_impl(stream, left_on, right_on).await
2659 }
2660
2661 pub async fn merge_index_metadata(
2662 &self,
2663 index_uuid: &str,
2664 index_type: IndexType,
2665 batch_readhead: Option<usize>,
2666 ) -> Result<()> {
2667 let store = LanceIndexStore::from_dataset_for_new(self, index_uuid)?;
2668 let index_dir = self.indices_dir().child(index_uuid);
2669 match index_type {
2670 IndexType::Inverted => {
2671 lance_index::scalar::inverted::builder::merge_index_files(
2673 self.object_store(),
2674 &index_dir,
2675 Arc::new(store),
2676 )
2677 .await
2678 }
2679 IndexType::BTree => {
2680 lance_index::scalar::btree::merge_index_files(
2682 self.object_store(),
2683 &index_dir,
2684 Arc::new(store),
2685 batch_readhead,
2686 )
2687 .await
2688 }
2689 IndexType::IvfFlat | IndexType::IvfPq | IndexType::IvfSq | IndexType::Vector => {
2691 crate::index::vector::ivf::finalize_distributed_merge(
2693 self.object_store(),
2694 &index_dir,
2695 Some(index_type),
2696 )
2697 .await?;
2698 Ok(())
2699 }
2700 _ => Err(Error::invalid_input_source(Box::new(std::io::Error::new(
2701 std::io::ErrorKind::InvalidInput,
2702 format!("Unsupported index type (patched): {}", index_type),
2703 )))),
2704 }
2705 }
2706}
2707
2708impl Dataset {
2721 pub fn metadata(&self) -> &HashMap<String, String> {
2723 &self.manifest.table_metadata
2724 }
2725
2726 pub fn config(&self) -> &HashMap<String, String> {
2728 &self.manifest.config
2729 }
2730
2731 #[deprecated(
2733 note = "Use the new update_config(values, replace) method - pass None values to delete keys"
2734 )]
2735 pub async fn delete_config_keys(&mut self, delete_keys: &[&str]) -> Result<()> {
2736 let updates = delete_keys.iter().map(|key| (*key, None));
2737 self.update_config(updates).await?;
2738 Ok(())
2739 }
2740
2741 pub fn update_metadata(
2768 &mut self,
2769 values: impl IntoIterator<Item = impl Into<UpdateMapEntry>>,
2770 ) -> metadata::UpdateMetadataBuilder<'_> {
2771 metadata::UpdateMetadataBuilder::new(self, values, metadata::MetadataType::TableMetadata)
2772 }
2773
2774 pub fn update_config(
2801 &mut self,
2802 values: impl IntoIterator<Item = impl Into<UpdateMapEntry>>,
2803 ) -> metadata::UpdateMetadataBuilder<'_> {
2804 metadata::UpdateMetadataBuilder::new(self, values, metadata::MetadataType::Config)
2805 }
2806
2807 pub fn update_schema_metadata(
2834 &mut self,
2835 values: impl IntoIterator<Item = impl Into<UpdateMapEntry>>,
2836 ) -> metadata::UpdateMetadataBuilder<'_> {
2837 metadata::UpdateMetadataBuilder::new(self, values, metadata::MetadataType::SchemaMetadata)
2838 }
2839
2840 #[deprecated(note = "Use the new update_schema_metadata(values).replace() instead")]
2842 pub async fn replace_schema_metadata(
2843 &mut self,
2844 new_values: impl IntoIterator<Item = (String, String)>,
2845 ) -> Result<()> {
2846 let new_values = new_values
2847 .into_iter()
2848 .map(|(k, v)| (k, Some(v)))
2849 .collect::<HashMap<_, _>>();
2850 self.update_schema_metadata(new_values).replace().await?;
2851 Ok(())
2852 }
2853
2854 pub fn update_field_metadata(&mut self) -> UpdateFieldMetadataBuilder<'_> {
2884 UpdateFieldMetadataBuilder::new(self)
2885 }
2886
2887 pub async fn replace_field_metadata(
2889 &mut self,
2890 new_values: impl IntoIterator<Item = (u32, HashMap<String, String>)>,
2891 ) -> Result<()> {
2892 let new_values = new_values.into_iter().collect::<HashMap<_, _>>();
2893 let field_metadata_updates = new_values
2894 .into_iter()
2895 .map(|(field_id, metadata)| {
2896 (
2897 field_id as i32,
2898 translate_schema_metadata_updates(&metadata),
2899 )
2900 })
2901 .collect();
2902 metadata::execute_metadata_update(
2903 self,
2904 Operation::UpdateConfig {
2905 config_updates: None,
2906 table_metadata_updates: None,
2907 schema_metadata_updates: None,
2908 field_metadata_updates,
2909 },
2910 )
2911 .await
2912 }
2913}
2914
2915#[async_trait::async_trait]
2916impl DatasetTakeRows for Dataset {
2917 fn schema(&self) -> &Schema {
2918 Self::schema(self)
2919 }
2920
2921 async fn take_rows(&self, row_ids: &[u64], projection: &Schema) -> Result<RecordBatch> {
2922 Self::take_rows(self, row_ids, projection.clone()).await
2923 }
2924}
2925
2926#[derive(Debug)]
2927pub(crate) struct ManifestWriteConfig {
2928 auto_set_feature_flags: bool, timestamp: Option<SystemTime>, use_stable_row_ids: bool, use_legacy_format: Option<bool>, storage_format: Option<DataStorageFormat>, disable_transaction_file: bool, }
2935
2936impl Default for ManifestWriteConfig {
2937 fn default() -> Self {
2938 Self {
2939 auto_set_feature_flags: true,
2940 timestamp: None,
2941 use_stable_row_ids: false,
2942 disable_transaction_file: false,
2943 use_legacy_format: None,
2944 storage_format: None,
2945 }
2946 }
2947}
2948
2949impl ManifestWriteConfig {
2950 pub fn disable_transaction_file(&self) -> bool {
2951 self.disable_transaction_file
2952 }
2953}
2954
2955#[allow(clippy::too_many_arguments)]
2957pub(crate) async fn write_manifest_file(
2958 object_store: &ObjectStore,
2959 commit_handler: &dyn CommitHandler,
2960 base_path: &Path,
2961 manifest: &mut Manifest,
2962 indices: Option<Vec<IndexMetadata>>,
2963 config: &ManifestWriteConfig,
2964 naming_scheme: ManifestNamingScheme,
2965 mut transaction: Option<&Transaction>,
2966) -> std::result::Result<ManifestLocation, CommitError> {
2967 if config.auto_set_feature_flags {
2968 apply_feature_flags(
2969 manifest,
2970 config.use_stable_row_ids,
2971 config.disable_transaction_file,
2972 )?;
2973 }
2974
2975 manifest.set_timestamp(timestamp_to_nanos(config.timestamp));
2976
2977 manifest.update_max_fragment_id();
2978
2979 commit_handler
2980 .commit(
2981 manifest,
2982 indices,
2983 base_path,
2984 object_store,
2985 write_manifest_file_to_path,
2986 naming_scheme,
2987 transaction.take().map(|tx| tx.into()),
2988 )
2989 .await
2990}
2991
2992impl Projectable for Dataset {
2993 fn schema(&self) -> &Schema {
2994 self.schema()
2995 }
2996}
2997
2998#[cfg(test)]
2999mod tests;